Skip to content

Commit

Permalink
refactor(dbm-services): 优化迁移spider表结构job超时问题 close #2691
Browse files Browse the repository at this point in the history
  • Loading branch information
ymakedaq authored and zhangzhw8 committed Dec 19, 2023
1 parent 35ffc47 commit e950aa3
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 25 deletions.
2 changes: 1 addition & 1 deletion dbm-services/common/db-resource/internal/svr/apply/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (c ApplyRequestInputParam) LockKey() string {

const (
// SAME_SUBZONE_CROSS_SWTICH TODO
SAME_SUBZONE_CROSS_SWTICH = "SAME_ZONE_CROSS_SWITCH"
SAME_SUBZONE_CROSS_SWTICH = "SAME_SUBZONE_CROSS_SWTICH"
// SAME_SUBZONE TODO
SAME_SUBZONE = "SAME_SUBZONE"
// CROS_SUBZONE TODO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ type ImportSchemaFromLocalSpiderComp struct {

// ImportSchemaFromLocalSpiderParam TODO
type ImportSchemaFromLocalSpiderParam struct {
Host string `json:"host" validate:"required,ip"` // 当前实例的主机地址
Port int `json:"port" validate:"required,lt=65536,gte=3306"` // 当前实例的端口
SpiderPort int `json:"spider_port" validate:"required,lt=65536,gte=3306"` // spider节点端口
Stream bool `json:"stream"` // mydumper stream myloader stream
DropBefore bool `json:"drop_before"` // 强制覆盖原来的表结构
Host string `json:"host" validate:"required,ip"` // 当前实例的主机地址
Port int `json:"port" validate:"required,lt=65536,gte=3306"` // 当前实例的端口
SpiderPort int `json:"spider_port" validate:"required,lt=65536,gte=3306"` // spider节点端口
UseMydumper bool `json:"use_mydumper"` // use mydumper
Stream bool `json:"stream"` // mydumper stream myloader stream
DropBefore bool `json:"drop_before"` // 强制覆盖原来的表结构
Threads int `json:"threads"` // 可配置最大并发 for mydumper myloader
}

type importSchemaFromLocalSpiderRuntime struct {
Expand Down Expand Up @@ -129,9 +131,13 @@ func (c *ImportSchemaFromLocalSpiderComp) Init() (err error) {
}
}
c.tmpDumpFile = time.Now().Format(cst.TimeLayoutDir) + "_schema.sql"
c.maxThreads = runtime.NumCPU() / 3
if c.maxThreads < 1 {
c.maxThreads = 1
if c.Params.Threads > 0 {
c.maxThreads = c.Params.Threads
} else {
c.maxThreads = runtime.NumCPU() / 3
if c.maxThreads < 1 {
c.maxThreads = 2
}
}
return err
}
Expand All @@ -149,17 +155,21 @@ func (c *ImportSchemaFromLocalSpiderComp) Migrate() (err error) {
logger.Warn("set close tc_ignore_partitioning_for_create_table failed %s", errx.Error())
}
}()
if !c.Params.Stream {
return c.commonMigrate()
if c.Params.UseMydumper {
if c.Params.Stream {
return c.streamMigrate()
}
return c.mydumperCommonMigrate()
}
return c.streamMigrate()
return c.commonMigrate()

}

func (c *ImportSchemaFromLocalSpiderComp) streamMigrate() (err error) {
logger.Info("will create mydumper.cnf ...")
mydumperCnf := path.Join(c.tmpDumpDir, "mydumper.cnf")
if !cmutil.FileExists(mydumperCnf) {
if err = os.WriteFile(mydumperCnf, []byte("[myloader_session_variables]\ntc_admin=0\n"), 0666); err != nil {
if err = os.WriteFile(mydumperCnf, []byte("[myloader_session_variables]\n tc_admin=0\n"), 0666); err != nil {
logger.Error("create mydumper.cnf failed %s", err.Error())
return err
}
Expand All @@ -173,7 +183,7 @@ func (c *ImportSchemaFromLocalSpiderComp) streamMigrate() (err error) {
Pwd: c.adminPwd,
Charset: c.charset,
Options: mysqlutil.MyDumperOptions{
Threads: 2,
Threads: c.maxThreads,
NoData: true,
UseStream: true,
Regex: "^(?!(mysql|infodba_schema|information_schema|performance_schema|sys))",
Expand All @@ -187,7 +197,7 @@ func (c *ImportSchemaFromLocalSpiderComp) streamMigrate() (err error) {
Charset: c.charset,
Options: mysqlutil.MyLoaderOptions{
NoData: true,
Threads: 2,
Threads: c.maxThreads,
UseStream: true,
DefaultsFile: mydumperCnf,
OverWriteTable: c.Params.DropBefore,
Expand All @@ -197,6 +207,57 @@ func (c *ImportSchemaFromLocalSpiderComp) streamMigrate() (err error) {
return streamFlow.Run()
}

// mydumperCommonMigrate TODO
func (c *ImportSchemaFromLocalSpiderComp) mydumperCommonMigrate() (err error) {
logger.Info("will create mydumper.cnf ...")
mydumperCnf := path.Join(c.tmpDumpDir, "mydumper.cnf")
if !cmutil.FileExists(mydumperCnf) {
if err = os.WriteFile(mydumperCnf, []byte("[myloader_session_variables]\n tc_admin=0\n"), 0666); err != nil {
logger.Error("create mydumper.cnf failed %s", err.Error())
return err
}
}
dumper := &mysqlutil.MyDumper{
Host: c.Params.Host,
Port: c.Params.SpiderPort,
User: c.adminUser,
Pwd: c.adminPwd,
Charset: c.charset,
DumpDir: c.tmpDumpDir,
Options: mysqlutil.MyDumperOptions{
Threads: c.maxThreads,
NoData: true,
UseStream: false,
Regex: "^(?!(mysql|infodba_schema|information_schema|performance_schema|sys))",
},
}
loader := &mysqlutil.MyLoader{
Host: c.Params.Host,
Port: c.Params.Port,
User: c.adminUser,
Pwd: c.adminPwd,
Charset: c.charset,
LoadDataDir: c.tmpDumpDir,
Options: mysqlutil.MyLoaderOptions{
NoData: true,
Threads: c.maxThreads,
UseStream: false,
DefaultsFile: mydumperCnf,
OverWriteTable: c.Params.DropBefore,
},
}
if err = dumper.Dumper(); err != nil {
logger.Error("use mydumper dump data failed %s", err.Error())
return err
}
logger.Info("dump data success ~")
if err = loader.Loader(); err != nil {
logger.Error("use myloader loader data failed %s", err.Error())
return err
}
return nil
}

// commonMigrate 使用mysqldump 原生方式去迁移
func (c *ImportSchemaFromLocalSpiderComp) commonMigrate() (err error) {
if len(c.dumpDbs) == 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,8 @@ func (m *MyDumper) buildCommand() (command string) {
} else {
command += fmt.Sprintf(" -o %s ", m.DumpDir)
}
command += " --events --routines --triggers "
command += " --trx-consistency-only --long-query-retry-interval=10 --compress "
command += " --events --routines --triggers --verbose "
command += " --trx-consistency-only --long-query-retry-interval=10 "
if m.Options.NoData {
command += " --no-data "
}
Expand Down Expand Up @@ -367,6 +367,7 @@ type MyLoaderOptions struct {
func (m *MyLoader) buildCommand() (command string) {
command = fmt.Sprintf(`%s -h %s -P %d -u %s -p '%s' --set-names=%s `, m.BinPath, m.Host,
m.Port, m.User, m.Pwd, m.Charset)
command += " --enable-binlog --verbose "
if m.Options.UseStream {
command += " --stream "
} else {
Expand All @@ -387,6 +388,38 @@ func (m *MyLoader) buildCommand() (command string) {
return
}

// Loader do myloader load data
func (m *MyLoader) Loader() (err error) {
m.BinPath = filepath.Join(cst.DbbackupGoInstallPath, "bin/myloader")
if err = setEnv(); err != nil {
logger.Error("set env failed %s", err.Error())
return
}
var stderr string
stderr, err = osutil.StandardShellCommand(false, m.buildCommand())
if err != nil {
logger.Error("stderr %s", stderr)
return fmt.Errorf("stderr:%s,err:%w", stderr, err)
}
return nil
}

// Dumper do mydumper dump data
func (m *MyDumper) Dumper() (err error) {
m.BinPath = filepath.Join(cst.DbbackupGoInstallPath, "bin/mydumper")
if err = setEnv(); err != nil {
logger.Error("set env failed %s", err.Error())
return
}
var stderr string
stderr, err = osutil.StandardShellCommand(false, m.buildCommand())
if err != nil {
logger.Error("stderr %s", stderr)
return fmt.Errorf("stderr:%s,err:%w", stderr, err)
}
return nil
}

// MyStreamDumpLoad stream dumper loader
type MyStreamDumpLoad struct {
Dumper *MyDumper
Expand All @@ -400,8 +433,8 @@ func (s *MyStreamDumpLoad) buildCommand() (command string) {
return fmt.Sprintf("%s|%s", dumpCmd, loadCmd)
}

// setEnv TODO
func (m *MyStreamDumpLoad) setEnv() (err error) {
// setEnv mydumper or myloader lib path
func setEnv() (err error) {
var libPath []string
libPath = append(libPath, filepath.Join(cst.DbbackupGoInstallPath, "lib/libmydumper"))
oldLibs := strings.Split(os.Getenv("LD_LIBRARY_PATH"), ":")
Expand All @@ -411,7 +444,7 @@ func (m *MyStreamDumpLoad) setEnv() (err error) {

// Run Command Run
func (s *MyStreamDumpLoad) Run() (err error) {
if err = s.setEnv(); err != nil {
if err = setEnv(); err != nil {
logger.Error("set env failed %s", err.Error())
return
}
Expand Down
4 changes: 3 additions & 1 deletion dbm-ui/backend/db_services/meta_import/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,10 @@ class TendbClusterStandardSerializer(serializers.Serializer):
bk_cloud_id = serializers.IntegerField(help_text=_("云区域ID"), default=0)
bk_biz_id = serializers.IntegerField(help_text=_("业务ID"))
cluster_ids = serializers.ListField(child=serializers.IntegerField(), help_text=_("待标准的集群列表"))
use_stream = serializers.BooleanField(help_text=_("使用mydumper流式备份导出导入"), required=False, default=True)
use_stream = serializers.BooleanField(help_text=_("是否使用mydumper流式备份迁移"), required=False, default=False)
drop_before = serializers.BooleanField(help_text=_("导入到tdbctl前,是否先删除"), required=False, default=False)
threads = serializers.IntegerField(help_text=_("业务ID"), required=False, default=0)
use_mydumper = serializers.BooleanField(help_text=_("是否使用mydumper,myloader迁移"), required=False, default=True)

def validate(self, attrs):
return attrs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(self, root_id: str, data: Optional[Dict]):
"bk_biz_id": <bk_biz_id>,
"drop_before": False,
"use_stream": True,
"use_mydumper": true,
"cluster_ids": [1, 2, 3]
}
"""
Expand All @@ -66,16 +67,17 @@ def __init__(self, root_id: str, data: Optional[Dict]):
self.tdbctl_pass = ""
self.tdbctl_user = ""
self.chartset = ""
self.threads = 0
self.bk_cloud_id = 0
# stream mydumper & myloader 流式备份导入,否则退化成mysqldump方式
self.stream = True
self.use_mydumper = True
self.stream = False
if self.data.get("use_stream"):
self.stream = self.data["use_stream"]
# drop_before 导入到中控是否带上drop语句
self.drop_before = False
if self.data.get("drop_before"):
self.drop_before = self.data["drop_before"]

self.bk_cloud_id = 0
if self.data.get("bk_cloud_id"):
self.bk_cloud_id = self.data["bk_cloud_id"]

Expand Down Expand Up @@ -174,6 +176,7 @@ def run(self):
exec_act_kwargs = ExecActuatorKwargs(
bk_cloud_id=int(self.data["bk_cloud_id"]),
cluster_type=ClusterType.TenDBCluster,
job_timeout=86400,
)

for cluser_id in cluster_ids:
Expand Down Expand Up @@ -218,7 +221,7 @@ def run(self):
# self.tdbctl_user="xxx"
# self.tdbctl_pass="xxx"
# self.chartset = "utf8"
# 给spider节点下发tdbctl 介质 0
# 给spider节点下发tdbctl介质
migrate_pipeline = SubBuilder(root_id=self.root_id, data=self.data)
migrate_pipeline.add_act(
act_name=_("下发tdbCtl介质包"),
Expand Down Expand Up @@ -288,7 +291,9 @@ def run(self):
"ctl_port": ctl_port,
"spider_port": leader_spider.port,
"stream": self.stream,
"use_mydumper": self.use_mydumper,
"drop_before": self.drop_before,
"threads": self.threads,
}
exec_act_kwargs.exec_ip = primary_ctl_ip
exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.get_import_schema_to_tdbctl_payload.__name__
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ def _execute(self, data, parent_data) -> bool:
else:
# 现在默认使用root账号来执行
common_kwargs["account_alias"] = DBA_ROOT_USER
if kwargs.get("job_timeout"):
common_kwargs["timeout"] = kwargs["job_timeout"]

resp = JobApi.fast_execute_script({**common_kwargs, **body}, raw=True)
self.log_info(f"{node_name} fast execute script response: {resp}")
Expand Down
1 change: 1 addition & 0 deletions dbm-ui/backend/flow/utils/mysql/mysql_act_dataclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class ExecActuatorBaseKwargs:
get_mysql_payload_func: str = None # 上下文中MysqlActPayload类的获取参数方法名称。空则传入None
cluster_type: str = None # 表示操作的集群类型,如果过程中不需要这个变量,则可以传None
cluster: dict = field(default_factory=dict) # 表示单据执行的集群信息,比如集群名称,集群域名等
job_timeout: int = 7200


@dataclass()
Expand Down
2 changes: 2 additions & 0 deletions dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,10 @@ def get_import_schema_to_tdbctl_payload(self, **kwargs):
"host": kwargs["ip"],
"port": self.cluster["ctl_port"],
"spider_port": self.cluster["spider_port"],
"use_mydumper": self.cluster["use_mydumper"],
"stream": self.cluster["stream"],
"drop_before": self.cluster["drop_before"],
"threads": self.cluster["threads"],
},
},
}
Expand Down

0 comments on commit e950aa3

Please sign in to comment.