Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mongodb): exporter #5429 #6798

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbm-services/go.work
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
go 1.21.11
go 1.21

use (
bigdata/db-tools/dbactuator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/logical"
"encoding/json"
"fmt"
"os"
"path"
"time"

Expand All @@ -31,7 +32,6 @@ const backupTypePhysical string = "physical" // 未实现

// backupParams 备份任务参数,由前端传入
type backupParams struct {
// 这个参数是不是可以从bk-dbmon.conf中获得?
BkDbmInstance config.BkDbmLabel `json:"bk_dbm_instance"`
IP string `json:"ip"`
Port int `json:"port"`
Expand Down Expand Up @@ -136,15 +136,19 @@ func (s *backupJob) doLogicalBackup() error {
partialArgs.DbList, partialArgs.IgnoreDbList,
partialArgs.ColList, partialArgs.IgnoreColList)

cmdLineList, cmdLine, err := helper.DumpPartial(tmpPath, "dump.log", filter)
cmdLineList, cmdLine, err, _ := helper.DumpPartial(tmpPath, "dump.log", filter)

if err != nil {
s.runtime.Logger.Error("exec cmd fail, cmd: %s, error:%s", cmdLine, err)
return errors.Wrap(err, "LogicalDumpPartial")
if errors.Is(err, logical.ErrorNoMatchDb) {
s.runtime.Logger.Warn("NoMatchDb")
return nil
} else {
s.runtime.Logger.Error("exec cmd fail, cmd: %s, error:%s", cmdLine, err)
return errors.Wrap(err, "LogicalDumpPartial")
}
}
s.runtime.Logger.Info("exec cmd success, cmd: %+v", cmdLineList)
} else {
// backupType = "dumpAll"
cmdLine, err := helper.LogicalDumpAll(tmpPath, "dump.log")
if err != nil {
s.runtime.Logger.Error("exec cmd fail, cmd: %s, error:%s", cmdLine, err)
Expand Down Expand Up @@ -180,6 +184,13 @@ func (s *backupJob) doLogicalBackup() error {
endTime = time.Now()
fSize, _ := util.GetFileSize(tarPath)
s.runtime.Logger.Info("backup file: %s size: %d", tarPath, fSize)

err = os.Chmod(tarPath, 0744)
if err != nil {
s.runtime.Logger.Error("chmod 0744 %s, err:%v", tarPath, err)
return errors.Wrap(err, "chmod")
}

// 上报备份记录。
task, err := backupsys.UploadFile(tarPath, s.ConfParams.BsTag)
// 如果此处失败,任务失败。
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package atommongodb

import (
"dbm-services/mongodb/db-tools/dbactuator/pkg/jobruntime"
"encoding/json"
"github.com/pkg/errors"
)

// hello 总是返回成功. 用于测试流程

// helloParams 备份任务参数,由前端传入
type helloParams struct {
IP string `json:"ip"`
Port int `json:"port"`
AdminUsername string `json:"adminUsername"`
AdminPassword string `json:"adminPassword"`
}

type helloJob struct {
BaseJob
ConfParams *helloParams
}

func (s *helloJob) Param() string {
o, _ := json.MarshalIndent(backupParams{}, "", "\t")
return string(o)
}

// NewHelloJob 实例化结构体
func NewHelloJob() jobruntime.JobRunner {
return &helloJob{}
}

// Name 获取原子任务的名字
func (s *helloJob) Name() string {
return "mongodb_hello"
}

// Run 运行原子任务
func (s *helloJob) Run() error {
s.runtime.Logger.Info("Run")
return nil
}

// Retry 重试
func (s *helloJob) Retry() uint {
// do nothing
return 2
}

// Rollback 回滚
func (s *helloJob) Rollback() error {
return nil
}

// Init 初始化
func (s *helloJob) Init(runtime *jobruntime.JobGenericRuntime) error {
// 获取安装参数
s.runtime = runtime
s.OsUser = "" // 备份进程,不再需要sudo,请以普通用户执行
if checkIsRootUser() {
s.runtime.Logger.Error("This job cannot be executed as root user")
return errors.New("This job cannot be executed as root user")
}
if err := json.Unmarshal([]byte(s.runtime.PayloadDecoded), &s.ConfParams); err != nil {
tmpErr := errors.Wrap(err, "payload json.Unmarshal failed")
s.runtime.Logger.Error(tmpErr.Error())
return tmpErr
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (job *installDbmonJob) Name() string {
func (job *installDbmonJob) Run() error {
// 生成配置文件 updateDbTool updateDbmon startDbmon
return job.runSteps([]stepFunc{
{"mkExporterConfigFile", job.mkExporterConfigFile},
{"updateConfigFile", job.updateConfigFile},
{"updateDbTool", job.updateDbTool},
{"updateToolKit", job.updateToolKit},
Expand Down Expand Up @@ -156,6 +157,21 @@ func compareServers(old, new []config.ConfServerItem) bool {
return true
}

// mkExporterConfigFile exporter需要的用户和密码.
func (job *installDbmonJob) mkExporterConfigFile() error {
var err error
for _, s := range job.params.Servers {
err = common.WriteExporterConfigFile(s.Port, map[string]string{
"username": s.UserName,
"password": s.Password,
})
if err != nil {
return err
}
}
return nil
}

func (job *installDbmonJob) updateConfigFile() error {
// consts.BkDbmonPath
configFile := consts.BkDbmonConfFile
Expand All @@ -174,11 +190,20 @@ func (job *installDbmonJob) updateConfigFile() error {
}

if oldConf != nil {
if oldConf.ReportSaveDir == conf.ReportSaveDir &&
oldConf.ReportLeftDay == conf.ReportLeftDay &&
oldConf.HttpAddress == conf.HttpAddress &&
oldConf.BkMonitorBeat == conf.BkMonitorBeat &&
compareServers(oldConf.Servers, conf.Servers) {
eq := make(map[string]bool)
eq["ReportSaveDir"] = oldConf.ReportSaveDir == conf.ReportSaveDir
eq["ReportLeftDay"] = oldConf.ReportLeftDay == conf.ReportLeftDay
eq["HttpAddress"] = oldConf.HttpAddress == conf.HttpAddress
eq["BkMonitorBeat"] = reflect.DeepEqual(oldConf.BkMonitorBeat, conf.BkMonitorBeat)
eq["Servers"] = compareServers(oldConf.Servers, conf.Servers)
ndiff := 0
for fieldName, same := range eq {
if !same {
ndiff++
job.runtime.Logger.Info("config file %s %q has been changed", configFile, fieldName)
}
}
if ndiff == 0 {
job.runtime.Logger.Info("config file %s has not been changed", configFile)
return nil
}
Expand All @@ -195,8 +220,6 @@ func (job *installDbmonJob) updateConfigFile() error {
err = cpfile(srcFile, dstFile)
if err != nil {
job.runtime.Logger.Warn("cpfile %s to %s failed, err:%v", srcFile, dstFile, err)
} else {
job.runtime.Logger.Info("cpfile %s to %s success", srcFile, dstFile)
}
}
}
Expand Down Expand Up @@ -236,18 +259,20 @@ func (job *installDbmonJob) updateToolKit() error {
prevFile := path.Join(consts.PackageCachePath, fileName)
newFile := path.Join(consts.PackageSavePath, fileName)
dstDir := "/home/mysql/dbtools/mg/"
dstFile := path.Join(dstDir, fileName)
util.MkDirsIfNotExists([]string{dstDir})
skipped, err := untarMedia(prevFile, newFile, dstDir)
if err != nil {
return errors.Wrap(err, "updateDbTool")
return errors.Wrap(err, "updateToolKit")
}
if skipped {
job.runtime.Logger.Info(
"updateDbTool %s to %s skipped. Because the MD5 value of the file has not changed.",
"updateToolKit %s to %s skipped. Because the MD5 value of the file has not changed.",
newFile, dstDir)
return nil
}
job.runtime.Logger.Info("updateDbTool %s to %s done", newFile, dstDir)
os.Chmod(dstFile, 0755)
job.runtime.Logger.Info("updateToolKit %s to %s done", newFile, dstDir)
cpfile(newFile, prevFile) // 只在成功untar后才能cp
return nil
}
Expand Down Expand Up @@ -283,7 +308,7 @@ func (job *installDbmonJob) startDbmon() error {
job.updateDbmonFlag = true // always restart bk-dbmon

if isRunning {
job.runtime.Logger.Info("bk-dbmon is running, and md5 is changed. no need to restart. ")
job.runtime.Logger.Info("bk-dbmon is running, and md5 is changed, restart dbmon")
if err := exec.Command("kill", "-9", strconv.Itoa(pid)).Run(); err != nil {
return errors.Wrap(err, "kill -9")
} else {
Expand All @@ -293,12 +318,14 @@ func (job *installDbmonJob) startDbmon() error {
} else {
job.runtime.Logger.Info("bk-dbmon is not running")
}
cmd := mycmd.New("/home/mysql/bk-dbmon/start.sh")
job.runtime.Logger.Info("exec %s", cmd.GetCmdLine2(false))
cmd.Run(30 * time.Second)

pid, err = startDbmon(consts.BkDbmonBin, consts.BkDbmonConfFile, "output.log")
pid, err = dbmonIsRunning(consts.BkDbmonBin)
if err != nil || pid <= 0 {
return errors.Errorf("start dbmon failed, err:%v, pid:%d", err, pid)
return errors.New("bk-dbmon start failed")
}

job.runtime.Logger.Info("start dbmon success, pid:%d", pid)
return nil
}
Expand Down Expand Up @@ -340,7 +367,6 @@ func untarMedia(prevFile, newFile, dstDir string) (skipped bool, err error) {
}
}

// 是否要将output打印出来呢?不了吧,太多了.
return
}

Expand Down Expand Up @@ -407,16 +433,3 @@ func dbmonIsRunning(comm string) (pid int, err error) {
}
return 0, nil
}

func startDbmon(dbmonBin, configFilePath, outputFileName string) (pid int, err error) {
if !util.FileExists(dbmonBin) {
err = errors.New("dbmonBin not exists")

}
if err = os.Chdir(path.Dir(dbmonBin)); err != nil {
err = errors.Wrap(err, "os.Chdir")
return
}

return mycmd.New(dbmonBin, "--config", path.Base(configFilePath)).RunBackground(outputFileName)
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ type pitrRecoverParam struct {
Port int `json:"port"`
AdminUsername string `json:"adminUsername"`
AdminPassword string `json:"adminPassword"`
InstanceType string `json:"instanceType"`
SrcAddr string `json:"srcAddr"` // ip:port
RecoverTimeStr string `json:"recoverTimeStr"` // recoverTime yyyy-mm-ddTHH:MM:SS
DryRun bool `json:"dryRun"` // 测试模式
Dir string `json:"dir"` // 备份文件存放目录.
recvoerTimeUnix uint32 `json:"-"`
// InstanceType string `json:"instanceType"`
}

type pitrRecoverJob struct {
Expand Down Expand Up @@ -97,7 +97,6 @@ func (s *pitrRecoverJob) checkDstMongo() error {
}
var notEmptyDb []string
for _, db := range dbList {

if mymongo.IsSysDb(db) {
continue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type removeNsJob struct {
MongoInst *mymongo.MongoHost
MongoClient *mongo.Client
tmp struct {
Err error
NsList []logical.DbCollection
NsIndex map[string][]*mongo.IndexSpecification
}
Expand Down Expand Up @@ -105,6 +106,12 @@ func connectPrimary(host *mymongo.MongoHost) (client *mongo.Client, err error) {
}

func (s *removeNsJob) dropCollection() (err error) {

if s.tmp.Err != nil && errors.Is(s.tmp.Err, logical.ErrorNoMatchDb) {
s.runtime.Logger.Info("no matched database and collection.")
return nil
}

err = s.backupIndex()
if err != nil {
return err
Expand Down Expand Up @@ -285,7 +292,15 @@ func (s *removeNsJob) getNsList() (err error) {

dbColList, err := logical.GetDbCollectionWithFilter(s.MongoInst.Host, s.MongoInst.Port,
s.MongoInst.User, s.MongoInst.Pass, s.MongoInst.AuthDb, filter)

s.runtime.Logger.Info(fmt.Sprintf("GetDbCollectionWithFilter: return %+v %v", dbColList, err))

if err != nil {
// 如果没有匹配的库表,算作成功. 返回Error给后面的流程处理.
if errors.Is(err, logical.ErrorNoMatchDb) {
s.tmp.Err = err
return nil
}
return errors.Wrap(err, "GetDbCollectionWithFilter")
}
// skip sys db
Expand All @@ -297,10 +312,9 @@ func (s *removeNsJob) getNsList() (err error) {
}

if len(s.tmp.NsList) == 0 {
return errors.Errorf("no matched db and col found")
s.tmp.Err = logical.ErrorNoMatchDb
}
s.runtime.Logger.Info(fmt.Sprintf("getNsList:%+v", s.tmp.NsList))

return nil
} else {
s.tmp.NsList, err = logical.GetDbCollection(s.MongoInst.Host, s.MongoInst.Port,
Expand Down
Loading