Skip to content

Commit

Permalink
feat(mongodb): exporter #5429
Browse files Browse the repository at this point in the history
  • Loading branch information
cycker committed Jul 16, 2024
1 parent e6c5f47 commit 80e489b
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (job *installDbmonJob) Name() string {
func (job *installDbmonJob) Run() error {
// 生成配置文件 updateDbTool updateDbmon startDbmon
return job.runSteps([]stepFunc{
{"mkExporterConfigFile", job.mkExporterConfigFile},
{"updateConfigFile", job.updateConfigFile},
{"updateDbTool", job.updateDbTool},
{"updateToolKit", job.updateToolKit},
Expand Down Expand Up @@ -156,6 +157,21 @@ func compareServers(old, new []config.ConfServerItem) bool {
return true
}

// mkExporterConfigFile exporter需要的用户和密码.
func (job *installDbmonJob) mkExporterConfigFile() error {
var err error
for _, s := range job.params.Servers {
err = common.WriteExporterConfigFile(s.Port, map[string]string{
"username": s.UserName,
"password": s.Password,
})
if err != nil {
return err
}
}
return nil
}

func (job *installDbmonJob) updateConfigFile() error {
// consts.BkDbmonPath
configFile := consts.BkDbmonConfFile
Expand All @@ -174,11 +190,20 @@ func (job *installDbmonJob) updateConfigFile() error {
}

if oldConf != nil {
if oldConf.ReportSaveDir == conf.ReportSaveDir &&
oldConf.ReportLeftDay == conf.ReportLeftDay &&
oldConf.HttpAddress == conf.HttpAddress &&
oldConf.BkMonitorBeat == conf.BkMonitorBeat &&
compareServers(oldConf.Servers, conf.Servers) {
eq := make(map[string]bool)
eq["ReportSaveDir"] = oldConf.ReportSaveDir == conf.ReportSaveDir
eq["ReportLeftDay"] = oldConf.ReportLeftDay == conf.ReportLeftDay
eq["HttpAddress"] = oldConf.HttpAddress == conf.HttpAddress
eq["BkMonitorBeat"] = reflect.DeepEqual(oldConf.BkMonitorBeat, conf.BkMonitorBeat)
eq["Servers"] = compareServers(oldConf.Servers, conf.Servers)
ndiff := 0
for fieldName, same := range eq {
if !same {
ndiff++
job.runtime.Logger.Info("config file %s %s has been changed", configFile, fieldName)
}
}
if ndiff == 0 {
job.runtime.Logger.Info("config file %s has not been changed", configFile)
return nil
}
Expand Down Expand Up @@ -293,12 +318,14 @@ func (job *installDbmonJob) startDbmon() error {
} else {
job.runtime.Logger.Info("bk-dbmon is not running")
}
cmd := mycmd.New("/home/mysql/bk-dbmon/start.sh")
job.runtime.Logger.Info("exec %s", cmd.GetCmdLine2(false))
cmd.Run(30 * time.Second)

pid, err = startDbmon(consts.BkDbmonBin, consts.BkDbmonConfFile, "output.log")
pid, err = dbmonIsRunning(consts.BkDbmonBin)
if err != nil || pid <= 0 {
return errors.Errorf("start dbmon failed, err:%v, pid:%d", err, pid)
return errors.New("bk-dbmon start failed")
}

job.runtime.Logger.Info("start dbmon success, pid:%d", pid)
return nil
}
Expand Down Expand Up @@ -407,16 +434,3 @@ func dbmonIsRunning(comm string) (pid int, err error) {
}
return 0, nil
}

func startDbmon(dbmonBin, configFilePath, outputFileName string) (pid int, err error) {
if !util.FileExists(dbmonBin) {
err = errors.New("dbmonBin not exists")

}
if err = os.Chdir(path.Dir(dbmonBin)); err != nil {
err = errors.Wrap(err, "os.Chdir")
return
}

return mycmd.New(dbmonBin, "--config", path.Base(configFilePath)).RunBackground(outputFileName)
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ type pitrRecoverParam struct {
Port int `json:"port"`
AdminUsername string `json:"adminUsername"`
AdminPassword string `json:"adminPassword"`
InstanceType string `json:"instanceType"`
SrcAddr string `json:"srcAddr"` // ip:port
RecoverTimeStr string `json:"recoverTimeStr"` // recoverTime yyyy-mm-ddTHH:MM:SS
DryRun bool `json:"dryRun"` // 测试模式
Dir string `json:"dir"` // 备份文件存放目录.
recvoerTimeUnix uint32 `json:"-"`
// InstanceType string `json:"instanceType"`
}

type pitrRecoverJob struct {
Expand Down Expand Up @@ -97,7 +97,6 @@ func (s *pitrRecoverJob) checkDstMongo() error {
}
var notEmptyDb []string
for _, db := range dbList {

if mymongo.IsSysDb(db) {
continue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type restoreParam struct {
InstanceType string `json:"instanceType"`

Args struct {
RecoverDir string `json:"RecoverDir"` // /data/dbbak/recover_mg/
RecoverDir string `json:"recoverDir"` // /data/dbbak/recover_mg/
SrcFile []BsTaskArg `json:"srcFile"` // 目前只需要1个文件,但是为了兼容,还是使用数组.
IsPartial bool `json:"isPartial"` // 为true时,备份指定库和表
Oplog bool `json:"oplog"` // 是否备份oplog,只有在IsPartial为false可为true
Expand Down Expand Up @@ -73,13 +73,23 @@ func (s *restoreJob) Name() string {

// Run 运行原子任务
func (s *restoreJob) Run() error {
err := s.checkDstMongo()
if err != nil {
return errors.Wrap(err, "checkDstMongo")

type execFunc struct {
name string
f func() error
}
for _, f := range []execFunc{
{"checkDstMongo", s.checkDstMongo},
{"doLogicalRestore", s.doLogicalRestore},
} {
s.runtime.Logger.Info("Run %s start", f.name)
if err := f.f(); err != nil {
s.runtime.Logger.Error("Run %s failed. err %s", f.name, err.Error())
return errors.Wrap(err, f.name)
}
s.runtime.Logger.Info("Run %s done", f.name)
}
s.runtime.Logger.Info("checkDstMongo ok")
// 1. check dst mongo is ok
return s.doLogicalRestore()
return nil
}

// checkDstMongo 检查目标MongoDB中,没有要恢复的库和表.
Expand Down Expand Up @@ -124,18 +134,20 @@ func (s *restoreJob) doLogicalRestore() error {
}
log.Info("end untar file %s, dstDir %s", srcFilePath, dstDir)
dstDirWithDump := path.Join(dstDir, "dump")

// dbm系统产生的表. 不需要重复导入.
deletedCol, err := s.removeDbmSysNs(dstDirWithDump)
if err != nil {
return errors.Wrap(err, "removeDbmSysNs")
} else {
log.Info("removeDbmSysNs File: %+v", deletedCol)
}
// get DbCollection from Dir
dbColList, err := logical.GetDbCollectionFromDir(dstDirWithDump)
if err != nil {
return errors.Wrap(err, "GetDbCollectionFromDir")
}

for _, row := range dbColList {
if row.Db == "admin" {
continue
}
}

// 导入部分表时,要删除掉不需要的库和表文件.
s.removeNsByFilter(dbColList, dstDirWithDump)

Expand Down Expand Up @@ -174,6 +186,44 @@ func (s *restoreJob) doLogicalRestore() error {
return nil
}

// removeDbmSysNs
// 删除掉 admin/gcs.backup文件
// 删除掉 test/dbmon_heartbeat文件
// 删除掉 test/dbmon_heartbeat文件
func (s *restoreJob) removeDbmSysNs(dstDir string) ([]string, error) {
var colList = [][2]string{
{"admin", "gcs.backup"},
{"test", "dbmon.heartbeat"},
{"test", "dbmon_heartbeat"},
}
var deletedCol []string

for _, row := range colList {
db, col := row[0], row[1]
var toDelFileList []string
toDelFileList = append(toDelFileList,
fmt.Sprintf("%s/%s/%s.bson", dstDir, db, col),
fmt.Sprintf("%s/%s/%s.bson.gz", dstDir, db, col),
fmt.Sprintf("%s/%s/%s.metadata.json", dstDir, db, col),
)
ndel := 0
for _, file := range toDelFileList {
if !util.FileExists(file) {
continue
}
err := os.Remove(file)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("Remove %s", file))
}
ndel++
}
if ndel > 0 {
deletedCol = append(deletedCol, fmt.Sprintf("%s.%s", db, col))
}
}
return deletedCol, nil
}

func (s *restoreJob) removeNsByFilter(dbColList []logical.DbCollection, dstDir string) error {
// 导入部分表时,要删除掉不需要的库和表文件.
if !s.param.Args.IsPartial {
Expand All @@ -197,7 +247,7 @@ func (s *restoreJob) removeNsByFilter(dbColList []logical.DbCollection, dstDir s
continue
}
for _, col := range notMatchList {
var toDelFileList = []string{}
var toDelFileList []string
toDelFileList = append(toDelFileList,
fmt.Sprintf("%s/%s/%s.bson", dstDir, row.Db, col),
fmt.Sprintf("%s/%s/%s.bson.gz", dstDir, row.Db, col),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"dbm-services/mongodb/db-tools/dbactuator/pkg/util"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
)
Expand All @@ -30,7 +29,7 @@ func WriteExporterConfigFile(port int, data interface{}) (err error) {
}
confFile = getConfFileName(port)
fileData, _ = json.Marshal(data)
err = ioutil.WriteFile(confFile, fileData, 0755)
err = os.WriteFile(confFile, fileData, 0755)
if err != nil {
return err
}
Expand Down

0 comments on commit 80e489b

Please sign in to comment.