diff --git a/dbm-services/common/db-resource/internal/svr/apply/api.go b/dbm-services/common/db-resource/internal/svr/apply/api.go index 2412d9dc6c..d340c01f5f 100644 --- a/dbm-services/common/db-resource/internal/svr/apply/api.go +++ b/dbm-services/common/db-resource/internal/svr/apply/api.go @@ -131,7 +131,7 @@ func (c ApplyRequestInputParam) LockKey() string { const ( // SAME_SUBZONE_CROSS_SWTICH TODO - SAME_SUBZONE_CROSS_SWTICH = "SAME_ZONE_CROSS_SWITCH" + SAME_SUBZONE_CROSS_SWTICH = "SAME_SUBZONE_CROSS_SWTICH" // SAME_SUBZONE TODO SAME_SUBZONE = "SAME_SUBZONE" // CROS_SUBZONE TODO diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/import_schema_from_local_spider.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/import_schema_from_local_spider.go index dc276e59e5..76f5c381b7 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/import_schema_from_local_spider.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/import_schema_from_local_spider.go @@ -37,11 +37,13 @@ type ImportSchemaFromLocalSpiderComp struct { // ImportSchemaFromLocalSpiderParam TODO type ImportSchemaFromLocalSpiderParam struct { - Host string `json:"host" validate:"required,ip"` // 当前实例的主机地址 - Port int `json:"port" validate:"required,lt=65536,gte=3306"` // 当前实例的端口 - SpiderPort int `json:"spider_port" validate:"required,lt=65536,gte=3306"` // spider节点端口 - Stream bool `json:"stream"` // mydumper stream myloader stream - DropBefore bool `json:"drop_before"` // 强制覆盖原来的表结构 + Host string `json:"host" validate:"required,ip"` // 当前实例的主机地址 + Port int `json:"port" validate:"required,lt=65536,gte=3306"` // 当前实例的端口 + SpiderPort int `json:"spider_port" validate:"required,lt=65536,gte=3306"` // spider节点端口 + UseMydumper bool `json:"use_mydumper"` // use mydumper + Stream bool `json:"stream"` // mydumper stream myloader stream + DropBefore bool `json:"drop_before"` // 强制覆盖原来的表结构 + Threads int `json:"threads"` // 可配置最大并发 for mydumper myloader } type importSchemaFromLocalSpiderRuntime struct { @@ -129,9 +131,13 @@ func (c *ImportSchemaFromLocalSpiderComp) Init() (err error) { } } c.tmpDumpFile = time.Now().Format(cst.TimeLayoutDir) + "_schema.sql" - c.maxThreads = runtime.NumCPU() / 3 - if c.maxThreads < 1 { - c.maxThreads = 1 + if c.Params.Threads > 0 { + c.maxThreads = c.Params.Threads + } else { + c.maxThreads = runtime.NumCPU() / 3 + if c.maxThreads < 1 { + c.maxThreads = 2 + } } return err } @@ -149,17 +155,21 @@ func (c *ImportSchemaFromLocalSpiderComp) Migrate() (err error) { logger.Warn("set close tc_ignore_partitioning_for_create_table failed %s", errx.Error()) } }() - if !c.Params.Stream { - return c.commonMigrate() + if c.Params.UseMydumper { + if c.Params.Stream { + return c.streamMigrate() + } + return c.mydumperCommonMigrate() } - return c.streamMigrate() + return c.commonMigrate() + } func (c *ImportSchemaFromLocalSpiderComp) streamMigrate() (err error) { logger.Info("will create mydumper.cnf ...") mydumperCnf := path.Join(c.tmpDumpDir, "mydumper.cnf") if !cmutil.FileExists(mydumperCnf) { - if err = os.WriteFile(mydumperCnf, []byte("[myloader_session_variables]\ntc_admin=0\n"), 0666); err != nil { + if err = os.WriteFile(mydumperCnf, []byte("[myloader_session_variables]\n tc_admin=0\n"), 0666); err != nil { logger.Error("create mydumper.cnf failed %s", err.Error()) return err } @@ -173,7 +183,7 @@ func (c *ImportSchemaFromLocalSpiderComp) streamMigrate() (err error) { Pwd: c.adminPwd, Charset: c.charset, Options: mysqlutil.MyDumperOptions{ - Threads: 2, + Threads: c.maxThreads, NoData: true, UseStream: true, Regex: "^(?!(mysql|infodba_schema|information_schema|performance_schema|sys))", @@ -187,7 +197,7 @@ func (c *ImportSchemaFromLocalSpiderComp) streamMigrate() (err error) { Charset: c.charset, Options: mysqlutil.MyLoaderOptions{ NoData: true, - Threads: 2, + Threads: c.maxThreads, UseStream: true, DefaultsFile: mydumperCnf, OverWriteTable: c.Params.DropBefore, @@ -197,6 +207,57 @@ func (c *ImportSchemaFromLocalSpiderComp) streamMigrate() (err error) { return streamFlow.Run() } +// mydumperCommonMigrate TODO +func (c *ImportSchemaFromLocalSpiderComp) mydumperCommonMigrate() (err error) { + logger.Info("will create mydumper.cnf ...") + mydumperCnf := path.Join(c.tmpDumpDir, "mydumper.cnf") + if !cmutil.FileExists(mydumperCnf) { + if err = os.WriteFile(mydumperCnf, []byte("[myloader_session_variables]\n tc_admin=0\n"), 0666); err != nil { + logger.Error("create mydumper.cnf failed %s", err.Error()) + return err + } + } + dumper := &mysqlutil.MyDumper{ + Host: c.Params.Host, + Port: c.Params.SpiderPort, + User: c.adminUser, + Pwd: c.adminPwd, + Charset: c.charset, + DumpDir: c.tmpDumpDir, + Options: mysqlutil.MyDumperOptions{ + Threads: c.maxThreads, + NoData: true, + UseStream: false, + Regex: "^(?!(mysql|infodba_schema|information_schema|performance_schema|sys))", + }, + } + loader := &mysqlutil.MyLoader{ + Host: c.Params.Host, + Port: c.Params.Port, + User: c.adminUser, + Pwd: c.adminPwd, + Charset: c.charset, + LoadDataDir: c.tmpDumpDir, + Options: mysqlutil.MyLoaderOptions{ + NoData: true, + Threads: c.maxThreads, + UseStream: false, + DefaultsFile: mydumperCnf, + OverWriteTable: c.Params.DropBefore, + }, + } + if err = dumper.Dumper(); err != nil { + logger.Error("use mydumper dump data failed %s", err.Error()) + return err + } + logger.Info("dump data success ~") + if err = loader.Loader(); err != nil { + logger.Error("use myloader loader data failed %s", err.Error()) + return err + } + return nil +} + // commonMigrate 使用mysqldump 原生方式去迁移 func (c *ImportSchemaFromLocalSpiderComp) commonMigrate() (err error) { if len(c.dumpDbs) == 0 { diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysql_dumper.go b/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysql_dumper.go index a1bce33c3b..2c009a9f39 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysql_dumper.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysql_dumper.go @@ -329,8 +329,8 @@ func (m *MyDumper) buildCommand() (command string) { } else { command += fmt.Sprintf(" -o %s ", m.DumpDir) } - command += " --events --routines --triggers " - command += " --trx-consistency-only --long-query-retry-interval=10 --compress " + command += " --events --routines --triggers --verbose " + command += " --trx-consistency-only --long-query-retry-interval=10 " if m.Options.NoData { command += " --no-data " } @@ -367,6 +367,7 @@ type MyLoaderOptions struct { func (m *MyLoader) buildCommand() (command string) { command = fmt.Sprintf(`%s -h %s -P %d -u %s -p '%s' --set-names=%s `, m.BinPath, m.Host, m.Port, m.User, m.Pwd, m.Charset) + command += " --enable-binlog --verbose " if m.Options.UseStream { command += " --stream " } else { @@ -387,6 +388,38 @@ func (m *MyLoader) buildCommand() (command string) { return } +// Loader do myloader load data +func (m *MyLoader) Loader() (err error) { + m.BinPath = filepath.Join(cst.DbbackupGoInstallPath, "bin/myloader") + if err = setEnv(); err != nil { + logger.Error("set env failed %s", err.Error()) + return + } + var stderr string + stderr, err = osutil.StandardShellCommand(false, m.buildCommand()) + if err != nil { + logger.Error("stderr %s", stderr) + return fmt.Errorf("stderr:%s,err:%w", stderr, err) + } + return nil +} + +// Dumper do mydumper dump data +func (m *MyDumper) Dumper() (err error) { + m.BinPath = filepath.Join(cst.DbbackupGoInstallPath, "bin/mydumper") + if err = setEnv(); err != nil { + logger.Error("set env failed %s", err.Error()) + return + } + var stderr string + stderr, err = osutil.StandardShellCommand(false, m.buildCommand()) + if err != nil { + logger.Error("stderr %s", stderr) + return fmt.Errorf("stderr:%s,err:%w", stderr, err) + } + return nil +} + // MyStreamDumpLoad stream dumper loader type MyStreamDumpLoad struct { Dumper *MyDumper @@ -400,8 +433,8 @@ func (s *MyStreamDumpLoad) buildCommand() (command string) { return fmt.Sprintf("%s|%s", dumpCmd, loadCmd) } -// setEnv TODO -func (m *MyStreamDumpLoad) setEnv() (err error) { +// setEnv mydumper or myloader lib path +func setEnv() (err error) { var libPath []string libPath = append(libPath, filepath.Join(cst.DbbackupGoInstallPath, "lib/libmydumper")) oldLibs := strings.Split(os.Getenv("LD_LIBRARY_PATH"), ":") @@ -411,7 +444,7 @@ func (m *MyStreamDumpLoad) setEnv() (err error) { // Run Command Run func (s *MyStreamDumpLoad) Run() (err error) { - if err = s.setEnv(); err != nil { + if err = setEnv(); err != nil { logger.Error("set env failed %s", err.Error()) return } diff --git a/dbm-ui/backend/db_services/meta_import/serializers.py b/dbm-ui/backend/db_services/meta_import/serializers.py index c5315fdc69..efa5ce162f 100644 --- a/dbm-ui/backend/db_services/meta_import/serializers.py +++ b/dbm-ui/backend/db_services/meta_import/serializers.py @@ -79,8 +79,10 @@ class TendbClusterStandardSerializer(serializers.Serializer): bk_cloud_id = serializers.IntegerField(help_text=_("云区域ID"), default=0) bk_biz_id = serializers.IntegerField(help_text=_("业务ID")) cluster_ids = serializers.ListField(child=serializers.IntegerField(), help_text=_("待标准的集群列表")) - use_stream = serializers.BooleanField(help_text=_("使用mydumper流式备份导出导入"), required=False, default=True) + use_stream = serializers.BooleanField(help_text=_("是否使用mydumper流式备份迁移"), required=False, default=False) drop_before = serializers.BooleanField(help_text=_("导入到tdbctl前,是否先删除"), required=False, default=False) + threads = serializers.IntegerField(help_text=_("业务ID"), required=False, default=0) + use_mydumper = serializers.BooleanField(help_text=_("是否使用mydumper,myloader迁移"), required=False, default=True) def validate(self, attrs): return attrs diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/spider/migrate_spider_cluster_from_gcs.py b/dbm-ui/backend/flow/engine/bamboo/scene/spider/migrate_spider_cluster_from_gcs.py index aee208f137..bb34ff1f2e 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/spider/migrate_spider_cluster_from_gcs.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/spider/migrate_spider_cluster_from_gcs.py @@ -57,6 +57,7 @@ def __init__(self, root_id: str, data: Optional[Dict]): "bk_biz_id": , "drop_before": False, "use_stream": True, + "use_mydumper": true, "cluster_ids": [1, 2, 3] } """ @@ -66,16 +67,17 @@ def __init__(self, root_id: str, data: Optional[Dict]): self.tdbctl_pass = "" self.tdbctl_user = "" self.chartset = "" + self.threads = 0 + self.bk_cloud_id = 0 # stream mydumper & myloader 流式备份导入,否则退化成mysqldump方式 - self.stream = True + self.use_mydumper = True + self.stream = False if self.data.get("use_stream"): self.stream = self.data["use_stream"] # drop_before 导入到中控是否带上drop语句 self.drop_before = False if self.data.get("drop_before"): self.drop_before = self.data["drop_before"] - - self.bk_cloud_id = 0 if self.data.get("bk_cloud_id"): self.bk_cloud_id = self.data["bk_cloud_id"] @@ -174,6 +176,7 @@ def run(self): exec_act_kwargs = ExecActuatorKwargs( bk_cloud_id=int(self.data["bk_cloud_id"]), cluster_type=ClusterType.TenDBCluster, + job_timeout=86400, ) for cluser_id in cluster_ids: @@ -218,7 +221,7 @@ def run(self): # self.tdbctl_user="xxx" # self.tdbctl_pass="xxx" # self.chartset = "utf8" - # 给spider节点下发tdbctl 介质 0 + # 给spider节点下发tdbctl介质 migrate_pipeline = SubBuilder(root_id=self.root_id, data=self.data) migrate_pipeline.add_act( act_name=_("下发tdbCtl介质包"), @@ -288,7 +291,9 @@ def run(self): "ctl_port": ctl_port, "spider_port": leader_spider.port, "stream": self.stream, + "use_mydumper": self.use_mydumper, "drop_before": self.drop_before, + "threads": self.threads, } exec_act_kwargs.exec_ip = primary_ctl_ip exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.get_import_schema_to_tdbctl_payload.__name__ diff --git a/dbm-ui/backend/flow/plugins/components/collections/mysql/exec_actuator_script.py b/dbm-ui/backend/flow/plugins/components/collections/mysql/exec_actuator_script.py index 6a384f50f2..a16748c75b 100644 --- a/dbm-ui/backend/flow/plugins/components/collections/mysql/exec_actuator_script.py +++ b/dbm-ui/backend/flow/plugins/components/collections/mysql/exec_actuator_script.py @@ -133,6 +133,8 @@ def _execute(self, data, parent_data) -> bool: else: # 现在默认使用root账号来执行 common_kwargs["account_alias"] = DBA_ROOT_USER + if kwargs.get("job_timeout"): + common_kwargs["timeout"] = kwargs["job_timeout"] resp = JobApi.fast_execute_script({**common_kwargs, **body}, raw=True) self.log_info(f"{node_name} fast execute script response: {resp}") diff --git a/dbm-ui/backend/flow/utils/mysql/mysql_act_dataclass.py b/dbm-ui/backend/flow/utils/mysql/mysql_act_dataclass.py index 6cd967d95d..015c2f7c44 100644 --- a/dbm-ui/backend/flow/utils/mysql/mysql_act_dataclass.py +++ b/dbm-ui/backend/flow/utils/mysql/mysql_act_dataclass.py @@ -36,6 +36,7 @@ class ExecActuatorBaseKwargs: get_mysql_payload_func: str = None # 上下文中MysqlActPayload类的获取参数方法名称。空则传入None cluster_type: str = None # 表示操作的集群类型,如果过程中不需要这个变量,则可以传None cluster: dict = field(default_factory=dict) # 表示单据执行的集群信息,比如集群名称,集群域名等 + job_timeout: int = 7200 @dataclass() diff --git a/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py b/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py index 592382ad47..5605afb123 100644 --- a/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py +++ b/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py @@ -306,8 +306,10 @@ def get_import_schema_to_tdbctl_payload(self, **kwargs): "host": kwargs["ip"], "port": self.cluster["ctl_port"], "spider_port": self.cluster["spider_port"], + "use_mydumper": self.cluster["use_mydumper"], "stream": self.cluster["stream"], "drop_before": self.cluster["drop_before"], + "threads": self.cluster["threads"], }, }, }