Skip to content

Commit

Permalink
feat(mysql): dbbackup支持rocksdb恢复数据 #8143
Browse files Browse the repository at this point in the history
  • Loading branch information
seanlook committed Nov 25, 2024
1 parent b5a7e30 commit b9eeb6e
Show file tree
Hide file tree
Showing 23 changed files with 246 additions and 200 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var MycnfCloneItemsDefault = []string{
"table_definition_cache",
"long_query_time",
"max_connections",
"group_concat_max_len",

"slave_parallel_workers",
"slave_parallel_type",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (c *PtTableChecksumComp) GenerateConfigFile() (err error) {
c.Params.BkBizId, c.Params.ClusterId, c.Params.MasterPort,
c.Params.InnerRole, "", c.Params.ImmuteDomain, c.Params.MasterIp,
c.GeneralParam.RuntimeAccountParam.MonitorUser, c.GeneralParam.RuntimeAccountParam.MonitorPwd,
"http://127.0.0.1:9999", logDir, c.Params.RuntimeHour, c.tools)
"http://127.0.0.1:9999", logDir, c.tools)

cfg.PtChecksum.Replicate = c.Params.ReplicateTable

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type instanceInfo struct {
func NewRuntimeConfig(
bkBizId, clusterId, port int,
role, schedule, immuteDomain, ip, user, password, apiUrl, logDir string,
runtimeHour int,
tl *tools.ToolSet) *config.Config {
cfg := config.Config{
BkBizId: bkBizId,
Expand All @@ -72,7 +71,7 @@ func NewRuntimeConfig(
Args: []map[string]interface{}{
{
"name": "run-time",
"value": fmt.Sprintf("%dh", runtimeHour),
"value": "2h",
},
},
Replicate: fmt.Sprintf("%s.checksum", native.INFODBA_SCHEMA),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func generateRuntimeConfigIns(mcp *MySQLChecksumParam, instance *instanceInfo, r
cfg := NewRuntimeConfig(
instance.BkBizId, instance.ClusterId, instance.Port,
instance.Role, instance.Schedule, instance.ImmuteDomain, instance.Ip,
rtap.MonitorUser, rtap.MonitorPwd, mcp.ApiUrl, logDir, 2, tl)
rtap.MonitorUser, rtap.MonitorPwd, mcp.ApiUrl, logDir, tl)
cfg.SetFilter(nil, ignoreDbs, nil, nil)

b, err := yaml.Marshal(&cfg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,15 +309,7 @@ func (c *OnMySQLComponent) instanceDropSourceDBs(port int) error {
func (c *OnMySQLComponent) instanceRecreateSourceTables(port int) error {
for db := range c.dbTablesMap {
stageDBName := generateStageDBName(c.Param.StageDBHeader, c.Param.FlowTimeStr, db)
stageTables, err := rpkg.ListDBTables(c.dbConn, stageDBName)
if err != nil {
logger.Error("list stage db tables on instance %d from %s failed: %s", port, stageDBName, err.Error())
return err
}

// 这里改成从 stage 库拿表列表, 在重试的时候才能幂等
//for _, table := range c.dbTablesMap[db] {
for _, table := range stageTables {
for _, table := range c.dbTablesMap[db] {
err := c.instanceRecreateSourceTable(port, db, stageDBName, table)
if err != nil {
logger.Error(
Expand All @@ -336,25 +328,25 @@ func (c *OnMySQLComponent) instanceRecreateSourceTables(port int) error {
}

func (c *OnMySQLComponent) instanceRecreateSourceTable(port int, dbName, stageDBName, tableName string) error {
//yes, err := rpkg.IsTableExistsIn(c.dbConn, tableName, stageDBName)
//if err != nil {
// logger.Error("check table %s exists in %s failed: %s", tableName, stageDBName, err.Error())
// return err
//}
//
//if !yes {
// err := fmt.Errorf("%s.%s not found", stageDBName, tableName)
// logger.Error("re create source table on instance %d failed: ", port, err.Error())
// return err
//}
//logger.Info("source table found in stage db on instance %d, try to truncate", port)
yes, err := rpkg.IsTableExistsIn(c.dbConn, tableName, stageDBName)
if err != nil {
logger.Error("check table %s exists in %s failed: %s", tableName, stageDBName, err.Error())
return err
}

if !yes {
err := fmt.Errorf("%s.%s not found", stageDBName, tableName)
logger.Error("re create source table on instance %d failed: ", port, err.Error())
return err
}
logger.Info("source table found in stage db on instance %d, try to truncate", port)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := c.dbConn.ExecContext(
_, err = c.dbConn.ExecContext(
ctx,
fmt.Sprintf(
"CREATE TABLE IF NOT EXISTS `%s`.`%s` LIKE `%s`.`%s`",
"CREATE TABLE `%s`.`%s` LIKE `%s`.`%s`",
dbName, tableName, stageDBName, tableName,
),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,12 @@ func backupData(cnf *config.BackupConfig) (err error) {
}
logger.Log.Info("parse config file: end")
if cnf.Public.DataSchemaGrant == cst.BackupNone {
logger.Log.Info("backup nothing, exit")
logger.Log.Infof("backup nothing for %d, exit", cnf.Public.MysqlPort)
return nil
}
if err := precheck.BeforeDump(cnf); err != nil {
return err
}

// 备份权限 backup priv info
// 注意:如果只备份权限,则走 backupexe.ExecuteBackup(cnf) 逻辑
// 如果还备份 schema/data,则走下面这个逻辑
Expand Down
2 changes: 2 additions & 0 deletions dbm-services/mysql/db-tools/mysql-dbbackup/docs/loadbackup.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ Global Flags:

如果是 mydumper 备份出的文件,还可以通过 `--databases` 等选项控制导入的数据。mysqldump 导出的数据 使用 `--databases` 则会提示错误。

逻辑备份导入 `loadbackup logical` 是不需要指定字符集,dbbackup 会自动使用导出后备份文件里面的 charset (即 .index 里面的 backup_charset )

### 物理备份 恢复:
```
./dbbackup loadbackup physical --help
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package backupexe

import (
"database/sql"
"fmt"
"strings"

Expand All @@ -12,6 +13,7 @@ import (
"dbm-services/mysql/db-tools/mysql-dbbackup/pkg/cst"
"dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/dbareport"
"dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/logger"
"dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/mysqlconn"
"dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/precheck"
"dbm-services/mysql/db-tools/mysql-dbbackup/pkg/util"
)
Expand All @@ -24,7 +26,7 @@ type Dumper interface {
}

// BuildDumper return logical or physical dumper
func BuildDumper(cnf *config.BackupConfig, storageEngine string) (dumper Dumper, err error) {
func BuildDumper(cnf *config.BackupConfig, db *sql.DB) (dumper Dumper, err error) {
if cnf.Public.IfBackupGrantOnly() {
logger.Log.Infof("only backup grants for %d", cnf.Public.MysqlPort)
cnf.Public.BackupType = cst.BackupLogical
Expand All @@ -33,6 +35,10 @@ func BuildDumper(cnf *config.BackupConfig, storageEngine string) (dumper Dumper,
}
return dumper, nil
}
storageEngine, err := mysqlconn.GetStorageEngine(db)
if err != nil {
return nil, err
}
if err = precheck.CheckBackupType(cnf, storageEngine); err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (p *PhysicalDumper) Execute(enableTimeOut bool) error {

err = cmd.Run()
if err != nil {
logger.Log.Error("run physical backup failed: ", err, stderr.Bytes())
logger.Log.Error("run physical backup failed: ", err, stderr.String())
return errors.WithMessage(err, stderr.String())
}
return nil
Expand Down Expand Up @@ -259,15 +259,15 @@ func (p *PhysicalDumper) PrepareBackupMetaInfo(cnf *config.BackupConfig) (*dbare
// parse xtrabackup_info
if err = parseXtraInfo(qpressPath, xtrabackupInfoFileName, tmpFileName, &metaInfo); err != nil {
logger.Log.Warnf("xtrabackup_info file not found, use current time as BackupEndTime, err: %s", err.Error())
metaInfo.BackupBeginTime, _ = time.Parse(time.DateTime, p.backupStartTime.Format(time.DateTime))
metaInfo.BackupEndTime, _ = time.Parse(time.DateTime, p.backupEndTime.Format(time.DateTime))
metaInfo.BackupBeginTime = cmutil.TimeToSecondPrecision(p.backupStartTime)
metaInfo.BackupEndTime = cmutil.TimeToSecondPrecision(p.backupEndTime)
}
// parse xtrabackup_timestamp_info
if err := parseXtraTimestamp(qpressPath, xtrabackupTimestampFileName, tmpFileName, &metaInfo); err != nil {
// 此时刚备份完成,还没有开始打包,这里把当前时间认为是 consistent_time,不完善!
logger.Log.Warnf("xtrabackup_timestamp_info file not found, "+
"use current time as Consistent Time, err: %s", err.Error())
metaInfo.BackupConsistentTime, _ = time.Parse(time.DateTime, p.backupEndTime.Format(time.DateTime))
metaInfo.BackupConsistentTime = cmutil.TimeToSecondPrecision(p.backupEndTime)
}
// parse xtrabackup_binlog_info 本机的 binlog file,pos
if masterStatus, err := parseXtraBinlogInfo(qpressPath, xtrabackupBinlogInfoFileName, tmpFileName); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/pkg/errors"

"dbm-services/common/go-pubpkg/cmutil"
"dbm-services/mysql/db-tools/mysql-dbbackup/pkg/config"
"dbm-services/mysql/db-tools/mysql-dbbackup/pkg/cst"
"dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/dbareport"
Expand Down Expand Up @@ -114,9 +115,9 @@ func (p *PhysicalRocksdbDumper) initConfig(mysqlVersion string) error {

// Execute Perform data recovery operations.
func (p *PhysicalRocksdbDumper) Execute(enableTimeOut bool) error {
p.backupStartTime = time.Now()
p.backupStartTime = cmutil.TimeToSecondPrecision(time.Now())
defer func() {
p.backupEndTime = time.Now()
p.backupEndTime = cmutil.TimeToSecondPrecision(time.Now())
}()

// the storage engine must be rocksdb
Expand Down Expand Up @@ -196,32 +197,41 @@ func (p *PhysicalRocksdbDumper) Execute(enableTimeOut bool) error {

// PrepareBackupMetaInfo generate the metadata of database backup
func (p *PhysicalRocksdbDumper) PrepareBackupMetaInfo(cnf *config.BackupConfig) (*dbareport.IndexContent, error) {

// parse the binglog position
xtrabackupBinlogInfoFileName := filepath.Join(cnf.Public.BackupDir, cnf.Public.TargetName(), "xtrabackup_binlog_info")
xtrabackupSlaveInfoFileName := filepath.Join(cnf.Public.BackupDir, cnf.Public.TargetName(), "xtrabackup_slave_info")

tmpFileName := filepath.Join(cnf.Public.BackupDir, cnf.Public.TargetName(), "tmp_dbbackup_go.txt")
backupTargetDir := filepath.Join(cnf.Public.BackupDir, cnf.Public.TargetName())
xtrabackupBinlogInfoFileName := filepath.Join(backupTargetDir, "xtrabackup_binlog_info")
xtrabackupSlaveInfoFileName := filepath.Join(backupTargetDir, "xtrabackup_slave_info")
xtrabackupTimestampFileName := filepath.Join(backupTargetDir, "xtrabackup_timestamp_info")
tmpFileName := filepath.Join(backupTargetDir, "tmp_dbbackup_go.txt")

// obtain the qpress command path
exepath, err := os.Executable()
if err != nil {
return nil, err
}

exepath = filepath.Dir(exepath)
qpressPath := filepath.Join(exepath, "bin", "qpress")

var metaInfo = dbareport.IndexContent{
BinlogInfo: dbareport.BinlogStatusInfo{},
BackupMetaFileBase: dbareport.BackupMetaFileBase{
BackupBeginTime: p.backupStartTime,
BackupEndTime: p.backupEndTime,
},
}

// parse xtrabackup_timestamp_info
if err := parseXtraTimestamp(qpressPath, xtrabackupTimestampFileName, tmpFileName, &metaInfo); err != nil {
// 此时刚备份完成,还没有开始打包,这里把当前时间认为是 consistent_time,不完善!
logger.Log.Warnf("xtrabackup_timestamp_info file not found, "+
"use current time as Consistent Time, err: %s", err.Error())
metaInfo.BackupConsistentTime = metaInfo.BackupEndTime
}
// parse the binlog
masterStatus, err := parseXtraBinlogInfo(qpressPath, xtrabackupBinlogInfoFileName, tmpFileName)
if err != nil {
logger.Log.Errorf("do not parse xtrabackup binlog file, file name:%s, errmsg:%s",
xtrabackupBinlogInfoFileName, err)
return nil, err
logger.Log.Errorf("can not parse xtrabackup_binlog_info, errmsg:%s", err)
} else {
logger.Log.Infof("xtrabackup_binlog_info binlog file:%s pos:%s",
masterStatus.BinlogFile, masterStatus.BinlogPos)
}

// save the master node status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,10 @@ func ExecuteBackup(cnf *config.BackupConfig) (*dbareport.IndexContent, error) {
return nil, err
}

storageEngine, err := mysqlconn.GetStorageEngine(db)
if err != nil {
return nil, err
}
storageEngine = strings.ToLower(storageEngine)

mysqlVersion, isOfficial := util.VersionParser(versionStr)
XbcryptBin = GetXbcryptBin(mysqlVersion, isOfficial)

dumper, err := BuildDumper(cnf, storageEngine) // 会在里面确定备份方式
dumper, err := BuildDumper(cnf, db) // 会在里面确定备份方式
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (l *LogicalLoader) Execute() (err error) {
if err != nil {
logger.Log.Error("myloader load backup failed: ", err, errStr)
// 尝试读取 myloader.log 里 CRITICAL 关键字
grepError := []string{"grep", "-E", "CRITICAL", logfile, "|", "tail", "-5"}
grepError := []string{"cat", logfile, "|", "grep -Ei 'CRITICAL|not found|error|fatal'", "|", "tail", "-5"}
errStrPrefix := fmt.Sprintf("tail 5 error from %s", logfile)
errStrDetail, _, _ := cmutil.ExecCommand(true, "", grepError[0], grepError[1:]...)
if len(strings.TrimSpace(errStr)) > 0 {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package backupexe

import (
"bufio"
"fmt"
"os"
"os/exec"
Expand All @@ -11,6 +10,7 @@ import (
"time"

"dbm-services/common/go-pubpkg/cmutil"
"dbm-services/mysql/db-tools/dbactuator/pkg/util"
"dbm-services/mysql/db-tools/mysql-dbbackup/pkg/config"
"dbm-services/mysql/db-tools/mysql-dbbackup/pkg/cst"
"dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/dbareport"
Expand Down Expand Up @@ -173,59 +173,17 @@ func (p *PhysicalRocksdbLoader) initConfig(indexContent *dbareport.IndexContent)
return fmt.Errorf("the default file no exist, config file:%s", p.cfg.PhysicalLoad.DefaultsFile)
}

file, err := os.Open(p.cfg.PhysicalLoad.DefaultsFile)
if err != nil {
return fmt.Errorf("can not open the default file, config file:%s", p.cfg.PhysicalLoad.DefaultsFile)
}

defer file.Close()

// extract parameters from the configuration file.
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "datadir=") {
p.dataDir = strings.TrimPrefix(line, "datadir=")
p.dataDir = strings.TrimSpace(p.dataDir)
continue
}

if strings.HasPrefix(line, "innodb_log_group_home_dir=") {
p.innodbLogGroupHomeDir = strings.TrimPrefix(line, "innodb_log_group_home_dir=")
p.innodbLogGroupHomeDir = strings.TrimSpace(p.innodbLogGroupHomeDir)
continue
}

if strings.HasPrefix(line, "innodb_data_home_dir=") {
p.innodbDataHomeDir = strings.TrimPrefix(line, "innodb_data_home_dir=")
p.innodbDataHomeDir = strings.TrimSpace(p.innodbDataHomeDir)
continue
}

if strings.HasPrefix(line, "log_bin=") {
p.logbinDir = filepath.Dir(strings.TrimPrefix(line, "log_bin="))
p.logbinDir = strings.TrimSpace(p.logbinDir)
continue
}

if strings.HasPrefix(line, "relay-log=") {
p.relaylogDir = filepath.Dir(strings.TrimPrefix(line, "relay-log="))
p.relaylogDir = strings.TrimSpace(p.relaylogDir)
continue
}

if strings.HasPrefix(line, "slow_query_log_file=") {
p.slowQueryLogFile = filepath.Dir(strings.TrimPrefix(line, "slow_query_log_file="))
p.slowQueryLogFile = strings.TrimSpace(p.slowQueryLogFile)
continue
}

if strings.HasPrefix(line, "tmpdir=") {
p.tmpDir = strings.TrimPrefix(line, "tmpdir=")
p.tmpDir = strings.TrimSpace(p.tmpDir)
continue
}
cnfFile := &util.CnfFile{FileName: p.cfg.PhysicalLoad.DefaultsFile}
if err := cnfFile.Load(); err != nil {
return errors.WithMessagef(err, "fail to parse %s", p.cfg.PhysicalLoad.DefaultsFile)
}
p.dataDir, err = cnfFile.GetMySQLDataDir()
p.innodbLogGroupHomeDir, err = cnfFile.GetMysqldKeyVaule("innodb_log_group_home_dir")
p.innodbDataHomeDir, err = cnfFile.GetMysqldKeyVaule("innodb_data_home_dir")
p.relaylogDir, err = cnfFile.GetRelayLogDir()
p.logbinDir, _, err = cnfFile.GetBinLogDir()
p.tmpDir, err = cnfFile.GetMysqldKeyVaule("tmpdir")
p.slowQueryLogFile, err = cnfFile.GetMysqldKeyVaule("slow_query_log_file")

// store the base parameters
p.dbbackupHome = filepath.Dir(cmdPath)
Expand Down
Loading

0 comments on commit b9eeb6e

Please sign in to comment.