Skip to content

Commit

Permalink
feat(mongodb): create mongodb pwfile #5429
Browse files Browse the repository at this point in the history
  • Loading branch information
cycker committed Sep 10, 2024
1 parent e6c5f47 commit ba17a23
Show file tree
Hide file tree
Showing 29 changed files with 415 additions and 151 deletions.
2 changes: 1 addition & 1 deletion dbm-services/go.work
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
go 1.21.11
go 1.21

use (
bigdata/db-tools/dbactuator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/logical"
"encoding/json"
"fmt"
"os"
"path"
"time"

Expand All @@ -31,7 +32,6 @@ const backupTypePhysical string = "physical" // 未实现

// backupParams 备份任务参数,由前端传入
type backupParams struct {
// 这个参数是不是可以从bk-dbmon.conf中获得?
BkDbmInstance config.BkDbmLabel `json:"bk_dbm_instance"`
IP string `json:"ip"`
Port int `json:"port"`
Expand Down Expand Up @@ -136,15 +136,19 @@ func (s *backupJob) doLogicalBackup() error {
partialArgs.DbList, partialArgs.IgnoreDbList,
partialArgs.ColList, partialArgs.IgnoreColList)

cmdLineList, cmdLine, err := helper.DumpPartial(tmpPath, "dump.log", filter)
cmdLineList, cmdLine, err, _ := helper.DumpPartial(tmpPath, "dump.log", filter)

if err != nil {
s.runtime.Logger.Error("exec cmd fail, cmd: %s, error:%s", cmdLine, err)
return errors.Wrap(err, "LogicalDumpPartial")
if errors.Is(err, logical.ErrorNoMatchDb) {
s.runtime.Logger.Warn("NoMatchDb")
return nil
} else {
s.runtime.Logger.Error("exec cmd fail, cmd: %s, error:%s", cmdLine, err)
return errors.Wrap(err, "LogicalDumpPartial")
}
}
s.runtime.Logger.Info("exec cmd success, cmd: %+v", cmdLineList)
} else {
// backupType = "dumpAll"
cmdLine, err := helper.LogicalDumpAll(tmpPath, "dump.log")
if err != nil {
s.runtime.Logger.Error("exec cmd fail, cmd: %s, error:%s", cmdLine, err)
Expand Down Expand Up @@ -180,6 +184,13 @@ func (s *backupJob) doLogicalBackup() error {
endTime = time.Now()
fSize, _ := util.GetFileSize(tarPath)
s.runtime.Logger.Info("backup file: %s size: %d", tarPath, fSize)

err = os.Chmod(tarPath, 0744)
if err != nil {
s.runtime.Logger.Error("chmod 0744 %s, err:%v", tarPath, err)
return errors.Wrap(err, "chmod")
}

// 上报备份记录。
task, err := backupsys.UploadFile(tarPath, s.ConfParams.BsTag)
// 如果此处失败,任务失败。
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package atommongodb

import (
"dbm-services/mongodb/db-tools/dbactuator/pkg/jobruntime"
"encoding/json"
"github.com/pkg/errors"
)

// hello 总是返回成功. 用于测试流程

// helloParams 备份任务参数,由前端传入
type helloParams struct {
IP string `json:"ip"`
Port int `json:"port"`
AdminUsername string `json:"adminUsername"`
AdminPassword string `json:"adminPassword"`
}

type helloJob struct {
BaseJob
ConfParams *helloParams
}

func (s *helloJob) Param() string {
o, _ := json.MarshalIndent(backupParams{}, "", "\t")
return string(o)
}

// NewHelloJob 实例化结构体
func NewHelloJob() jobruntime.JobRunner {
return &helloJob{}
}

// Name 获取原子任务的名字
func (s *helloJob) Name() string {
return "mongodb_hello"
}

// Run 运行原子任务
func (s *helloJob) Run() error {
s.runtime.Logger.Info("Run")
return nil
}

// Retry 重试
func (s *helloJob) Retry() uint {
// do nothing
return 2
}

// Rollback 回滚
func (s *helloJob) Rollback() error {
return nil
}

// Init 初始化
func (s *helloJob) Init(runtime *jobruntime.JobGenericRuntime) error {
// 获取安装参数
s.runtime = runtime
s.OsUser = "" // 备份进程,不再需要sudo,请以普通用户执行
if checkIsRootUser() {
s.runtime.Logger.Error("This job cannot be executed as root user")
return errors.New("This job cannot be executed as root user")
}
if err := json.Unmarshal([]byte(s.runtime.PayloadDecoded), &s.ConfParams); err != nil {
tmpErr := errors.Wrap(err, "payload json.Unmarshal failed")
s.runtime.Logger.Error(tmpErr.Error())
return tmpErr
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (job *installDbmonJob) Name() string {
func (job *installDbmonJob) Run() error {
// 生成配置文件 updateDbTool updateDbmon startDbmon
return job.runSteps([]stepFunc{
{"mkExporterConfigFile", job.mkExporterConfigFile},
{"updateConfigFile", job.updateConfigFile},
{"updateDbTool", job.updateDbTool},
{"updateToolKit", job.updateToolKit},
Expand Down Expand Up @@ -156,6 +157,21 @@ func compareServers(old, new []config.ConfServerItem) bool {
return true
}

// mkExporterConfigFile exporter需要的用户和密码.
func (job *installDbmonJob) mkExporterConfigFile() error {
var err error
for _, s := range job.params.Servers {
err = common.WriteExporterConfigFile(s.Port, map[string]string{
"username": s.UserName,
"password": s.Password,
})
if err != nil {
return err
}
}
return nil
}

func (job *installDbmonJob) updateConfigFile() error {
// consts.BkDbmonPath
configFile := consts.BkDbmonConfFile
Expand All @@ -174,11 +190,20 @@ func (job *installDbmonJob) updateConfigFile() error {
}

if oldConf != nil {
if oldConf.ReportSaveDir == conf.ReportSaveDir &&
oldConf.ReportLeftDay == conf.ReportLeftDay &&
oldConf.HttpAddress == conf.HttpAddress &&
oldConf.BkMonitorBeat == conf.BkMonitorBeat &&
compareServers(oldConf.Servers, conf.Servers) {
eq := make(map[string]bool)
eq["ReportSaveDir"] = oldConf.ReportSaveDir == conf.ReportSaveDir
eq["ReportLeftDay"] = oldConf.ReportLeftDay == conf.ReportLeftDay
eq["HttpAddress"] = oldConf.HttpAddress == conf.HttpAddress
eq["BkMonitorBeat"] = reflect.DeepEqual(oldConf.BkMonitorBeat, conf.BkMonitorBeat)
eq["Servers"] = compareServers(oldConf.Servers, conf.Servers)
ndiff := 0
for fieldName, same := range eq {
if !same {
ndiff++
job.runtime.Logger.Info("config file %s %q has been changed", configFile, fieldName)
}
}
if ndiff == 0 {
job.runtime.Logger.Info("config file %s has not been changed", configFile)
return nil
}
Expand All @@ -195,8 +220,6 @@ func (job *installDbmonJob) updateConfigFile() error {
err = cpfile(srcFile, dstFile)
if err != nil {
job.runtime.Logger.Warn("cpfile %s to %s failed, err:%v", srcFile, dstFile, err)
} else {
job.runtime.Logger.Info("cpfile %s to %s success", srcFile, dstFile)
}
}
}
Expand Down Expand Up @@ -236,18 +259,20 @@ func (job *installDbmonJob) updateToolKit() error {
prevFile := path.Join(consts.PackageCachePath, fileName)
newFile := path.Join(consts.PackageSavePath, fileName)
dstDir := "/home/mysql/dbtools/mg/"
dstFile := path.Join(dstDir, fileName)
util.MkDirsIfNotExists([]string{dstDir})
skipped, err := untarMedia(prevFile, newFile, dstDir)
if err != nil {
return errors.Wrap(err, "updateDbTool")
return errors.Wrap(err, "updateToolKit")
}
if skipped {
job.runtime.Logger.Info(
"updateDbTool %s to %s skipped. Because the MD5 value of the file has not changed.",
"updateToolKit %s to %s skipped. Because the MD5 value of the file has not changed.",
newFile, dstDir)
return nil
}
job.runtime.Logger.Info("updateDbTool %s to %s done", newFile, dstDir)
os.Chmod(dstFile, 0755)
job.runtime.Logger.Info("updateToolKit %s to %s done", newFile, dstDir)
cpfile(newFile, prevFile) // 只在成功untar后才能cp
return nil
}
Expand Down Expand Up @@ -283,7 +308,7 @@ func (job *installDbmonJob) startDbmon() error {
job.updateDbmonFlag = true // always restart bk-dbmon

if isRunning {
job.runtime.Logger.Info("bk-dbmon is running, and md5 is changed. no need to restart. ")
job.runtime.Logger.Info("bk-dbmon is running, and md5 is changed, restart dbmon")
if err := exec.Command("kill", "-9", strconv.Itoa(pid)).Run(); err != nil {
return errors.Wrap(err, "kill -9")
} else {
Expand All @@ -293,12 +318,14 @@ func (job *installDbmonJob) startDbmon() error {
} else {
job.runtime.Logger.Info("bk-dbmon is not running")
}
cmd := mycmd.New("/home/mysql/bk-dbmon/start.sh")
job.runtime.Logger.Info("exec %s", cmd.GetCmdLine2(false))
cmd.Run(30 * time.Second)

pid, err = startDbmon(consts.BkDbmonBin, consts.BkDbmonConfFile, "output.log")
pid, err = dbmonIsRunning(consts.BkDbmonBin)
if err != nil || pid <= 0 {
return errors.Errorf("start dbmon failed, err:%v, pid:%d", err, pid)
return errors.New("bk-dbmon start failed")
}

job.runtime.Logger.Info("start dbmon success, pid:%d", pid)
return nil
}
Expand Down Expand Up @@ -340,7 +367,6 @@ func untarMedia(prevFile, newFile, dstDir string) (skipped bool, err error) {
}
}

// 是否要将output打印出来呢?不了吧,太多了.
return
}

Expand Down Expand Up @@ -407,16 +433,3 @@ func dbmonIsRunning(comm string) (pid int, err error) {
}
return 0, nil
}

func startDbmon(dbmonBin, configFilePath, outputFileName string) (pid int, err error) {
if !util.FileExists(dbmonBin) {
err = errors.New("dbmonBin not exists")

}
if err = os.Chdir(path.Dir(dbmonBin)); err != nil {
err = errors.Wrap(err, "os.Chdir")
return
}

return mycmd.New(dbmonBin, "--config", path.Base(configFilePath)).RunBackground(outputFileName)
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ type pitrRecoverParam struct {
Port int `json:"port"`
AdminUsername string `json:"adminUsername"`
AdminPassword string `json:"adminPassword"`
InstanceType string `json:"instanceType"`
SrcAddr string `json:"srcAddr"` // ip:port
RecoverTimeStr string `json:"recoverTimeStr"` // recoverTime yyyy-mm-ddTHH:MM:SS
DryRun bool `json:"dryRun"` // 测试模式
Dir string `json:"dir"` // 备份文件存放目录.
recvoerTimeUnix uint32 `json:"-"`
// InstanceType string `json:"instanceType"`
}

type pitrRecoverJob struct {
Expand Down Expand Up @@ -97,7 +97,6 @@ func (s *pitrRecoverJob) checkDstMongo() error {
}
var notEmptyDb []string
for _, db := range dbList {

if mymongo.IsSysDb(db) {
continue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type removeNsJob struct {
MongoInst *mymongo.MongoHost
MongoClient *mongo.Client
tmp struct {
Err error
NsList []logical.DbCollection
NsIndex map[string][]*mongo.IndexSpecification
}
Expand Down Expand Up @@ -105,6 +106,12 @@ func connectPrimary(host *mymongo.MongoHost) (client *mongo.Client, err error) {
}

func (s *removeNsJob) dropCollection() (err error) {

if s.tmp.Err != nil && errors.Is(s.tmp.Err, logical.ErrorNoMatchDb) {
s.runtime.Logger.Info("no matched database and collection.")
return nil
}

err = s.backupIndex()
if err != nil {
return err
Expand Down Expand Up @@ -285,7 +292,15 @@ func (s *removeNsJob) getNsList() (err error) {

dbColList, err := logical.GetDbCollectionWithFilter(s.MongoInst.Host, s.MongoInst.Port,
s.MongoInst.User, s.MongoInst.Pass, s.MongoInst.AuthDb, filter)

s.runtime.Logger.Info(fmt.Sprintf("GetDbCollectionWithFilter: return %+v %v", dbColList, err))

if err != nil {
// 如果没有匹配的库表,算作成功. 返回Error给后面的流程处理.
if errors.Is(err, logical.ErrorNoMatchDb) {
s.tmp.Err = err
return nil
}
return errors.Wrap(err, "GetDbCollectionWithFilter")
}
// skip sys db
Expand All @@ -297,10 +312,9 @@ func (s *removeNsJob) getNsList() (err error) {
}

if len(s.tmp.NsList) == 0 {
return errors.Errorf("no matched db and col found")
s.tmp.Err = logical.ErrorNoMatchDb
}
s.runtime.Logger.Info(fmt.Sprintf("getNsList:%+v", s.tmp.NsList))

return nil
} else {
s.tmp.NsList, err = logical.GetDbCollection(s.MongoInst.Host, s.MongoInst.Port,
Expand Down
Loading

0 comments on commit ba17a23

Please sign in to comment.