Skip to content

Commit

Permalink
feat(mysql): dbbackup支持rocksdb备份 #6505
Browse files Browse the repository at this point in the history
  • Loading branch information
uriwang committed Sep 19, 2024
1 parent 7fde93e commit b6b9d3c
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 9 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ pre-*-bkcodeai

bkcodeai.json
package-lock.json

### PreCI ###
.codecc
13 changes: 9 additions & 4 deletions dbm-services/mysql/db-tools/mysql-dbbackup/pkg/cst/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ const (
BackupNone = "none"
)

const (
StorageEnginRocksdb = "rocksdb"
)

// backup role: dbm-services/mysql/db-tools/dbactuator/pkg/core/cst/mysql.go
const (
// RoleMaster lower case
Expand Down Expand Up @@ -112,8 +116,9 @@ const MysqlCrondUrl = "http://127.0.0.1:9999"
const MysqlRotateBinlogInstallPath = "/home/mysql/mysql-rotatebinlog"

const (
ToolMydumper = "mydumper"
ToolMysqldump = "mysqldump"
ToolXtrabackup = "xtrabackup"
ToolTmysqldump = "tmysqldump"
ToolMydumper = "mydumper"
ToolMysqldump = "mysqldump"
ToolXtrabackup = "xtrabackup"
ToolTmysqldump = "tmysqldump"
ToolMyrocksHotbackup = "myrocks_hotbackup"
)
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Dumper interface {
}

// BuildDumper return logical or physical dumper
func BuildDumper(cnf *config.BackupConfig) (dumper Dumper, err error) {
func BuildDumper(cnf *config.BackupConfig, storageEngine string) (dumper Dumper, err error) {
if err = precheck.CheckBackupType(cnf); err != nil {
return nil, err
}
Expand Down Expand Up @@ -66,13 +66,21 @@ func BuildDumper(cnf *config.BackupConfig) (dumper Dumper, err error) {
if err := validate.GoValidateStruct(cnf.PhysicalBackup, false, false); err != nil {
return nil, err
}
dumper = &PhysicalDumper{
cnf: cnf,

if cst.StorageEnginRocksdb == storageEngine {
dumper = &PhysicalRocksdbDumper{
cfg: cnf,
}
} else {
dumper = &PhysicalDumper{
cnf: cnf,
}
}
} else {
logger.Log.Error(fmt.Sprintf("Unknown BackupType: %s", cnf.Public.BackupType))
err := fmt.Errorf("unknown BackupType: %s", cnf.Public.BackupType)
return nil, err
}

return dumper, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (p *PhysicalDumper) Execute(enableTimeOut bool) error {
_ = db.Close()
}()
if originVal, err := mysqlconn.SetGlobalVarAndReturnOrigin("slave_parallel_workers", "0", db); err != nil {
logger.Log.Error("set global slave_parallel_workers=0 failed, err: %s", err.Error())
logger.Log.Errorf("set global slave_parallel_workers=0 failed, err: %s", err.Error())
return err
} else {
logger.Log.Infof("will set global slave_parallel_workers=%s after backup finished", originVal)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package backupexe

import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"time"

"github.com/pkg/errors"

"dbm-services/mysql/db-tools/mysql-dbbackup/pkg/config"
"dbm-services/mysql/db-tools/mysql-dbbackup/pkg/cst"
"dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/dbareport"
"dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/logger"
"dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/mysqlconn"
"dbm-services/mysql/db-tools/mysql-dbbackup/pkg/util"
)

type PhysicalRocksdbDumper struct {
cfg *config.BackupConfig
dbbackupHome string
checkpointDir string
mysqlVersion string
isOfficial bool
rocksdbCmd string
storageEngine string
backupStartTime time.Time
backupEndTime time.Time
}

func (p *PhysicalRocksdbDumper) buildArgs() []string {

targetPath := filepath.Join(p.cfg.Public.BackupDir, p.cfg.Public.TargetName())

args := []string{
fmt.Sprintf("--host=%s", p.cfg.Public.MysqlHost),
fmt.Sprintf("--port=%d", p.cfg.Public.MysqlPort),
fmt.Sprintf("--user=%s", p.cfg.Public.MysqlUser),
fmt.Sprintf("--password=%s", p.cfg.Public.MysqlPasswd),
fmt.Sprintf("--checkpoint_dir=%s", p.checkpointDir),
fmt.Sprintf("--backup_dir=%s", targetPath),
"--stream=disabled",
}

if strings.ToLower(p.cfg.Public.MysqlRole) == cst.RoleSlave {
args = append(args, "--slave_info")
}

if strings.ToLower(p.cfg.Public.MysqlRole) == cst.RoleMaster {
args = append(args, "--master_info")
}

return args
}

func (p *PhysicalRocksdbDumper) initConfig(mysqlVersion string) error {
if p.cfg == nil {
return errors.New("rocksdb physical dumper config missed")
}

cmdPath, err := os.Executable()

if err != nil {
return err
}

p.dbbackupHome = filepath.Dir(cmdPath)
db, err := mysqlconn.InitConn(&p.cfg.Public)

if err != nil {
return err
}

p.mysqlVersion, p.isOfficial = util.VersionParser(mysqlVersion)
p.storageEngine, err = mysqlconn.GetStorageEngine(db)

if err != nil {
return err
}
p.storageEngine = strings.ToLower(p.storageEngine)

defer func() {
_ = db.Close()
}()

p.checkpointDir = fmt.Sprintf("%s/MyRocks_checkpoint", p.cfg.Public.BackupDir)
p.rocksdbCmd = "/bin/" + cst.ToolMyrocksHotbackup
BackupTool = cst.ToolMyrocksHotbackup
return nil
}

func (p *PhysicalRocksdbDumper) Execute(enableTimeOut bool) error {
p.backupStartTime = time.Now()
defer func() {
p.backupEndTime = time.Now()
}()

if p.storageEngine != cst.StorageEnginRocksdb {
err := fmt.Errorf("%s engine not support", p.storageEngine)
logger.Log.Error(err)
return err
}

_, err := os.Stat(p.checkpointDir)
if os.IsNotExist(err) {
err = os.MkdirAll(p.checkpointDir, 0755)
}

if err != nil {
logger.Log.Errorf("failed to create checkpoint(%s), err-msg:%s", p.checkpointDir, err)
return err
}

binPath := filepath.Join(p.dbbackupHome, p.rocksdbCmd)
args := p.buildArgs()

var cmd *exec.Cmd
backupCmd := fmt.Sprintf(`%s %s`, binPath, strings.Join(args, " "))

if enableTimeOut {
timeDiffUinx, err := GetMaxRunningTime(p.cfg.Public.BackupTimeOut)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), (time.Duration(timeDiffUinx))*time.Second)
defer cancel()

cmd = exec.CommandContext(ctx, "sh", "-c", backupCmd)
} else {
cmd = exec.Command("sh", "-c", backupCmd)
}

backuplogFilename := fmt.Sprintf("%s_backup_%d_%d.log", p.storageEngine, p.cfg.Public.MysqlPort, int(time.Now().Weekday()))
rocksdbBackuplogFilename := filepath.Join(p.dbbackupHome, "logs", backuplogFilename)

outFile, err := os.Create(rocksdbBackuplogFilename)

if err != nil {
logger.Log.Error("create log file failed: ", err)
return err
}

defer func() {
_ = outFile.Close()
}()

cmd.Stdout = outFile
cmd.Stderr = outFile
logger.Log.Info("rocksdb backup command: ", cmd.String())

err = cmd.Run()
if err != nil {
logger.Log.Error("run rocksdb physical backup failed: ", err)
return err
}

return nil
}

func (p *PhysicalRocksdbDumper) PrepareBackupMetaInfo(cnf *config.BackupConfig) (*dbareport.IndexContent, error) {
db, err := mysqlconn.InitConn(&cnf.Public)
if err != nil {
return nil, errors.WithMessage(err, "IndexContent")
}

defer func() {
_ = db.Close()
}()

storageEngine, err := mysqlconn.GetStorageEngine(db)
if err != nil {
return nil, err
}

storageEngine = strings.ToLower(storageEngine)

if storageEngine != "rocksdb" {
logger.Log.Errorf("unknown storage engine(%s)", storageEngine)
return nil, nil
}

xtrabackupBinlogInfoFileName := filepath.Join(cnf.Public.BackupDir, cnf.Public.TargetName(), "xtrabackup_binlog_info")
xtrabackupSlaveInfoFileName := filepath.Join(cnf.Public.BackupDir, cnf.Public.TargetName(), "xtrabackup_slave_info")

tmpFileName := filepath.Join(cnf.Public.BackupDir, cnf.Public.TargetName(), "tmp_dbbackup_go.txt")

exepath, err := os.Executable()
if err != nil {
return nil, err
}

exepath = filepath.Dir(exepath)
qpressPath := filepath.Join(exepath, "bin", "qpress")

var metaInfo = dbareport.IndexContent{
BinlogInfo: dbareport.BinlogStatusInfo{},
}

if masterStatus, err := parseXtraBinlogInfo(qpressPath, xtrabackupBinlogInfoFileName, tmpFileName); err != nil {
return nil, err
} else {
metaInfo.BinlogInfo.ShowMasterStatus = masterStatus
metaInfo.BinlogInfo.ShowMasterStatus.MasterHost = cnf.Public.MysqlHost
metaInfo.BinlogInfo.ShowMasterStatus.MasterPort = cnf.Public.MysqlPort
}

if mysqlRole := strings.ToLower(cnf.Public.MysqlRole); mysqlRole == cst.RoleSlave || mysqlRole == cst.RoleRepeater {
if slaveStatus, err := parseXtraSlaveInfo(qpressPath, xtrabackupSlaveInfoFileName, tmpFileName); err != nil {
return nil, err
} else {
metaInfo.BinlogInfo.ShowSlaveStatus = slaveStatus
masterHost, masterPort, err := mysqlconn.ShowMysqlSlaveStatus(db)
if err != nil {
return nil, err
}
metaInfo.BinlogInfo.ShowSlaveStatus.MasterHost = masterHost
metaInfo.BinlogInfo.ShowSlaveStatus.MasterPort = masterPort
}
}

metaInfo.JudgeIsFullBackup(&cnf.Public)
if err = os.Remove(tmpFileName); err != nil {
return &metaInfo, err
}

return &metaInfo, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,17 @@ func ExecuteBackup(cnf *config.BackupConfig) (*dbareport.IndexContent, error) {
if err != nil {
return nil, err
}

storageEngine, err := mysqlconn.GetStorageEngine(db)
if err != nil {
return nil, err
}
storageEngine = strings.ToLower(storageEngine)

mysqlVersion, isOfficial := util.VersionParser(versionStr)
XbcryptBin = GetXbcryptBin(mysqlVersion, isOfficial)

dumper, err := BuildDumper(cnf)
dumper, err := BuildDumper(cnf, storageEngine)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit b6b9d3c

Please sign in to comment.