From ba17a23788598aaf9946a0cd3ab4386c36772c26 Mon Sep 17 00:00:00 2001 From: cycker Date: Tue, 16 Jul 2024 14:38:36 +0800 Subject: [PATCH] feat(mongodb): create mongodb pwfile #5429 --- dbm-services/go.work | 2 +- .../pkg/atomjobs/atommongodb/backup.go | 21 +++-- .../pkg/atomjobs/atommongodb/hello.go | 72 +++++++++++++++ .../pkg/atomjobs/atommongodb/install_dbmon.go | 69 ++++++++------ .../pkg/atomjobs/atommongodb/pitr_restore.go | 3 +- .../pkg/atomjobs/atommongodb/remove_ns.go | 18 +++- .../pkg/atomjobs/atommongodb/restore.go | 78 +++++++++++++--- .../dbactuator/pkg/common/exporter_conf.go | 3 +- .../dbactuator/pkg/jobmanager/jobmanager.go | 1 + .../dbmon/cmd/mongojob/check_health_job.go | 12 +-- .../mongodb/db-tools/dbmon/cmd/root.go | 5 +- .../mongodb/db-tools/dbmon/config/config.go | 2 +- .../mongodb/db-tools/dbmon/mylog/mylog.go | 13 +-- .../dbmon/pkg/sendwarning/bkmonitorbeat.go | 2 +- .../mongo-toolkit-go/pkg/mycmd/mycmd.go | 89 ++++++++++--------- .../mongo-toolkit-go/pkg/mycmd/mycmd_test.go | 10 ++- .../mongo-toolkit-go/pkg/mymongo/mymongo.go | 3 +- .../pkg/report/backup_record.go | 10 ++- .../toolkit/logical/db_collection.go | 4 +- .../mongo-toolkit-go/toolkit/logical/dump.go | 28 +++--- .../mongo-toolkit-go/toolkit/pitr/backup.go | 7 +- .../mongo-toolkit-go/toolkit/pitr/job.go | 22 +++++ .../mongo-toolkit-go/toolkit/pitr/metav2.go | 2 +- .../mongo-toolkit-go/toolkit/pitr/recover.go | 8 +- .../toolkit/pitr/recover_helper.go | 13 ++- .../toolkit/pitr/recover_helper_test.go | 26 ++++++ .../db-remote-service/pkg/redis_rpc/common.go | 8 +- .../db-remote-service/pkg/redis_rpc/init.go | 2 + .../pkg/redis_rpc/redis_rpc.go | 33 ++++++- 29 files changed, 415 insertions(+), 151 deletions(-) create mode 100644 dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/hello.go create mode 100644 dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/recover_helper_test.go diff --git a/dbm-services/go.work b/dbm-services/go.work index bc6bcf85d7..549f8cb59b 100644 --- a/dbm-services/go.work +++ b/dbm-services/go.work @@ -1,4 +1,4 @@ -go 1.21.11 +go 1.21 use ( bigdata/db-tools/dbactuator diff --git a/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/backup.go b/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/backup.go index ff47da7764..7286db14d3 100644 --- a/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/backup.go +++ b/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/backup.go @@ -12,6 +12,7 @@ import ( "dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/logical" "encoding/json" "fmt" + "os" "path" "time" @@ -31,7 +32,6 @@ const backupTypePhysical string = "physical" // 未实现 // backupParams 备份任务参数,由前端传入 type backupParams struct { - // 这个参数是不是可以从bk-dbmon.conf中获得? BkDbmInstance config.BkDbmLabel `json:"bk_dbm_instance"` IP string `json:"ip"` Port int `json:"port"` @@ -136,15 +136,19 @@ func (s *backupJob) doLogicalBackup() error { partialArgs.DbList, partialArgs.IgnoreDbList, partialArgs.ColList, partialArgs.IgnoreColList) - cmdLineList, cmdLine, err := helper.DumpPartial(tmpPath, "dump.log", filter) + cmdLineList, cmdLine, err, _ := helper.DumpPartial(tmpPath, "dump.log", filter) if err != nil { - s.runtime.Logger.Error("exec cmd fail, cmd: %s, error:%s", cmdLine, err) - return errors.Wrap(err, "LogicalDumpPartial") + if errors.Is(err, logical.ErrorNoMatchDb) { + s.runtime.Logger.Warn("NoMatchDb") + return nil + } else { + s.runtime.Logger.Error("exec cmd fail, cmd: %s, error:%s", cmdLine, err) + return errors.Wrap(err, "LogicalDumpPartial") + } } s.runtime.Logger.Info("exec cmd success, cmd: %+v", cmdLineList) } else { - // backupType = "dumpAll" cmdLine, err := helper.LogicalDumpAll(tmpPath, "dump.log") if err != nil { s.runtime.Logger.Error("exec cmd fail, cmd: %s, error:%s", cmdLine, err) @@ -180,6 +184,13 @@ func (s *backupJob) doLogicalBackup() error { endTime = time.Now() fSize, _ := util.GetFileSize(tarPath) s.runtime.Logger.Info("backup file: %s size: %d", tarPath, fSize) + + err = os.Chmod(tarPath, 0744) + if err != nil { + s.runtime.Logger.Error("chmod 0744 %s, err:%v", tarPath, err) + return errors.Wrap(err, "chmod") + } + // 上报备份记录。 task, err := backupsys.UploadFile(tarPath, s.ConfParams.BsTag) // 如果此处失败,任务失败。 diff --git a/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/hello.go b/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/hello.go new file mode 100644 index 0000000000..99195c7cba --- /dev/null +++ b/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/hello.go @@ -0,0 +1,72 @@ +package atommongodb + +import ( + "dbm-services/mongodb/db-tools/dbactuator/pkg/jobruntime" + "encoding/json" + "github.com/pkg/errors" +) + +// hello 总是返回成功. 用于测试流程 + +// helloParams 备份任务参数,由前端传入 +type helloParams struct { + IP string `json:"ip"` + Port int `json:"port"` + AdminUsername string `json:"adminUsername"` + AdminPassword string `json:"adminPassword"` +} + +type helloJob struct { + BaseJob + ConfParams *helloParams +} + +func (s *helloJob) Param() string { + o, _ := json.MarshalIndent(backupParams{}, "", "\t") + return string(o) +} + +// NewHelloJob 实例化结构体 +func NewHelloJob() jobruntime.JobRunner { + return &helloJob{} +} + +// Name 获取原子任务的名字 +func (s *helloJob) Name() string { + return "mongodb_hello" +} + +// Run 运行原子任务 +func (s *helloJob) Run() error { + s.runtime.Logger.Info("Run") + return nil +} + +// Retry 重试 +func (s *helloJob) Retry() uint { + // do nothing + return 2 +} + +// Rollback 回滚 +func (s *helloJob) Rollback() error { + return nil +} + +// Init 初始化 +func (s *helloJob) Init(runtime *jobruntime.JobGenericRuntime) error { + // 获取安装参数 + s.runtime = runtime + s.OsUser = "" // 备份进程,不再需要sudo,请以普通用户执行 + if checkIsRootUser() { + s.runtime.Logger.Error("This job cannot be executed as root user") + return errors.New("This job cannot be executed as root user") + } + if err := json.Unmarshal([]byte(s.runtime.PayloadDecoded), &s.ConfParams); err != nil { + tmpErr := errors.Wrap(err, "payload json.Unmarshal failed") + s.runtime.Logger.Error(tmpErr.Error()) + return tmpErr + } + + return nil +} diff --git a/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/install_dbmon.go b/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/install_dbmon.go index 912a584149..43ff4237c4 100644 --- a/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/install_dbmon.go +++ b/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/install_dbmon.go @@ -72,6 +72,7 @@ func (job *installDbmonJob) Name() string { func (job *installDbmonJob) Run() error { // 生成配置文件 updateDbTool updateDbmon startDbmon return job.runSteps([]stepFunc{ + {"mkExporterConfigFile", job.mkExporterConfigFile}, {"updateConfigFile", job.updateConfigFile}, {"updateDbTool", job.updateDbTool}, {"updateToolKit", job.updateToolKit}, @@ -156,6 +157,21 @@ func compareServers(old, new []config.ConfServerItem) bool { return true } +// mkExporterConfigFile exporter需要的用户和密码. +func (job *installDbmonJob) mkExporterConfigFile() error { + var err error + for _, s := range job.params.Servers { + err = common.WriteExporterConfigFile(s.Port, map[string]string{ + "username": s.UserName, + "password": s.Password, + }) + if err != nil { + return err + } + } + return nil +} + func (job *installDbmonJob) updateConfigFile() error { // consts.BkDbmonPath configFile := consts.BkDbmonConfFile @@ -174,11 +190,20 @@ func (job *installDbmonJob) updateConfigFile() error { } if oldConf != nil { - if oldConf.ReportSaveDir == conf.ReportSaveDir && - oldConf.ReportLeftDay == conf.ReportLeftDay && - oldConf.HttpAddress == conf.HttpAddress && - oldConf.BkMonitorBeat == conf.BkMonitorBeat && - compareServers(oldConf.Servers, conf.Servers) { + eq := make(map[string]bool) + eq["ReportSaveDir"] = oldConf.ReportSaveDir == conf.ReportSaveDir + eq["ReportLeftDay"] = oldConf.ReportLeftDay == conf.ReportLeftDay + eq["HttpAddress"] = oldConf.HttpAddress == conf.HttpAddress + eq["BkMonitorBeat"] = reflect.DeepEqual(oldConf.BkMonitorBeat, conf.BkMonitorBeat) + eq["Servers"] = compareServers(oldConf.Servers, conf.Servers) + ndiff := 0 + for fieldName, same := range eq { + if !same { + ndiff++ + job.runtime.Logger.Info("config file %s %q has been changed", configFile, fieldName) + } + } + if ndiff == 0 { job.runtime.Logger.Info("config file %s has not been changed", configFile) return nil } @@ -195,8 +220,6 @@ func (job *installDbmonJob) updateConfigFile() error { err = cpfile(srcFile, dstFile) if err != nil { job.runtime.Logger.Warn("cpfile %s to %s failed, err:%v", srcFile, dstFile, err) - } else { - job.runtime.Logger.Info("cpfile %s to %s success", srcFile, dstFile) } } } @@ -236,18 +259,20 @@ func (job *installDbmonJob) updateToolKit() error { prevFile := path.Join(consts.PackageCachePath, fileName) newFile := path.Join(consts.PackageSavePath, fileName) dstDir := "/home/mysql/dbtools/mg/" + dstFile := path.Join(dstDir, fileName) util.MkDirsIfNotExists([]string{dstDir}) skipped, err := untarMedia(prevFile, newFile, dstDir) if err != nil { - return errors.Wrap(err, "updateDbTool") + return errors.Wrap(err, "updateToolKit") } if skipped { job.runtime.Logger.Info( - "updateDbTool %s to %s skipped. Because the MD5 value of the file has not changed.", + "updateToolKit %s to %s skipped. Because the MD5 value of the file has not changed.", newFile, dstDir) return nil } - job.runtime.Logger.Info("updateDbTool %s to %s done", newFile, dstDir) + os.Chmod(dstFile, 0755) + job.runtime.Logger.Info("updateToolKit %s to %s done", newFile, dstDir) cpfile(newFile, prevFile) // 只在成功untar后才能cp return nil } @@ -283,7 +308,7 @@ func (job *installDbmonJob) startDbmon() error { job.updateDbmonFlag = true // always restart bk-dbmon if isRunning { - job.runtime.Logger.Info("bk-dbmon is running, and md5 is changed. no need to restart. ") + job.runtime.Logger.Info("bk-dbmon is running, and md5 is changed, restart dbmon") if err := exec.Command("kill", "-9", strconv.Itoa(pid)).Run(); err != nil { return errors.Wrap(err, "kill -9") } else { @@ -293,12 +318,14 @@ func (job *installDbmonJob) startDbmon() error { } else { job.runtime.Logger.Info("bk-dbmon is not running") } + cmd := mycmd.New("/home/mysql/bk-dbmon/start.sh") + job.runtime.Logger.Info("exec %s", cmd.GetCmdLine2(false)) + cmd.Run(30 * time.Second) - pid, err = startDbmon(consts.BkDbmonBin, consts.BkDbmonConfFile, "output.log") + pid, err = dbmonIsRunning(consts.BkDbmonBin) if err != nil || pid <= 0 { - return errors.Errorf("start dbmon failed, err:%v, pid:%d", err, pid) + return errors.New("bk-dbmon start failed") } - job.runtime.Logger.Info("start dbmon success, pid:%d", pid) return nil } @@ -340,7 +367,6 @@ func untarMedia(prevFile, newFile, dstDir string) (skipped bool, err error) { } } - // 是否要将output打印出来呢?不了吧,太多了. return } @@ -407,16 +433,3 @@ func dbmonIsRunning(comm string) (pid int, err error) { } return 0, nil } - -func startDbmon(dbmonBin, configFilePath, outputFileName string) (pid int, err error) { - if !util.FileExists(dbmonBin) { - err = errors.New("dbmonBin not exists") - - } - if err = os.Chdir(path.Dir(dbmonBin)); err != nil { - err = errors.Wrap(err, "os.Chdir") - return - } - - return mycmd.New(dbmonBin, "--config", path.Base(configFilePath)).RunBackground(outputFileName) -} diff --git a/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/pitr_restore.go b/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/pitr_restore.go index 883eb1cd8c..9a92b2ed8a 100644 --- a/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/pitr_restore.go +++ b/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/pitr_restore.go @@ -30,12 +30,12 @@ type pitrRecoverParam struct { Port int `json:"port"` AdminUsername string `json:"adminUsername"` AdminPassword string `json:"adminPassword"` - InstanceType string `json:"instanceType"` SrcAddr string `json:"srcAddr"` // ip:port RecoverTimeStr string `json:"recoverTimeStr"` // recoverTime yyyy-mm-ddTHH:MM:SS DryRun bool `json:"dryRun"` // 测试模式 Dir string `json:"dir"` // 备份文件存放目录. recvoerTimeUnix uint32 `json:"-"` + // InstanceType string `json:"instanceType"` } type pitrRecoverJob struct { @@ -97,7 +97,6 @@ func (s *pitrRecoverJob) checkDstMongo() error { } var notEmptyDb []string for _, db := range dbList { - if mymongo.IsSysDb(db) { continue } diff --git a/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/remove_ns.go b/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/remove_ns.go index c1f9c855ff..aa9f6757e6 100644 --- a/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/remove_ns.go +++ b/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/remove_ns.go @@ -48,6 +48,7 @@ type removeNsJob struct { MongoInst *mymongo.MongoHost MongoClient *mongo.Client tmp struct { + Err error NsList []logical.DbCollection NsIndex map[string][]*mongo.IndexSpecification } @@ -105,6 +106,12 @@ func connectPrimary(host *mymongo.MongoHost) (client *mongo.Client, err error) { } func (s *removeNsJob) dropCollection() (err error) { + + if s.tmp.Err != nil && errors.Is(s.tmp.Err, logical.ErrorNoMatchDb) { + s.runtime.Logger.Info("no matched database and collection.") + return nil + } + err = s.backupIndex() if err != nil { return err @@ -285,7 +292,15 @@ func (s *removeNsJob) getNsList() (err error) { dbColList, err := logical.GetDbCollectionWithFilter(s.MongoInst.Host, s.MongoInst.Port, s.MongoInst.User, s.MongoInst.Pass, s.MongoInst.AuthDb, filter) + + s.runtime.Logger.Info(fmt.Sprintf("GetDbCollectionWithFilter: return %+v %v", dbColList, err)) + if err != nil { + // 如果没有匹配的库表,算作成功. 返回Error给后面的流程处理. + if errors.Is(err, logical.ErrorNoMatchDb) { + s.tmp.Err = err + return nil + } return errors.Wrap(err, "GetDbCollectionWithFilter") } // skip sys db @@ -297,10 +312,9 @@ func (s *removeNsJob) getNsList() (err error) { } if len(s.tmp.NsList) == 0 { - return errors.Errorf("no matched db and col found") + s.tmp.Err = logical.ErrorNoMatchDb } s.runtime.Logger.Info(fmt.Sprintf("getNsList:%+v", s.tmp.NsList)) - return nil } else { s.tmp.NsList, err = logical.GetDbCollection(s.MongoInst.Host, s.MongoInst.Port, diff --git a/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/restore.go b/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/restore.go index 49c593efa4..aec4cbdaf8 100644 --- a/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/restore.go +++ b/dbm-services/mongodb/db-tools/dbactuator/pkg/atomjobs/atommongodb/restore.go @@ -39,7 +39,7 @@ type restoreParam struct { InstanceType string `json:"instanceType"` Args struct { - RecoverDir string `json:"RecoverDir"` // /data/dbbak/recover_mg/ + RecoverDir string `json:"recoverDir"` // /data/dbbak/recover_mg/ SrcFile []BsTaskArg `json:"srcFile"` // 目前只需要1个文件,但是为了兼容,还是使用数组. IsPartial bool `json:"isPartial"` // 为true时,备份指定库和表 Oplog bool `json:"oplog"` // 是否备份oplog,只有在IsPartial为false可为true @@ -73,13 +73,23 @@ func (s *restoreJob) Name() string { // Run 运行原子任务 func (s *restoreJob) Run() error { - err := s.checkDstMongo() - if err != nil { - return errors.Wrap(err, "checkDstMongo") + + type execFunc struct { + name string + f func() error + } + for _, f := range []execFunc{ + {"checkDstMongo", s.checkDstMongo}, + {"doLogicalRestore", s.doLogicalRestore}, + } { + s.runtime.Logger.Info("Run %s start", f.name) + if err := f.f(); err != nil { + s.runtime.Logger.Error("Run %s failed. err %s", f.name, err.Error()) + return errors.Wrap(err, f.name) + } + s.runtime.Logger.Info("Run %s done", f.name) } - s.runtime.Logger.Info("checkDstMongo ok") - // 1. check dst mongo is ok - return s.doLogicalRestore() + return nil } // checkDstMongo 检查目标MongoDB中,没有要恢复的库和表. @@ -124,18 +134,20 @@ func (s *restoreJob) doLogicalRestore() error { } log.Info("end untar file %s, dstDir %s", srcFilePath, dstDir) dstDirWithDump := path.Join(dstDir, "dump") + + // dbm系统产生的表. 不需要重复导入. + deletedCol, err := s.removeDbmSysNs(dstDirWithDump) + if err != nil { + return errors.Wrap(err, "removeDbmSysNs") + } else { + log.Info("removeDbmSysNs File: %+v", deletedCol) + } // get DbCollection from Dir dbColList, err := logical.GetDbCollectionFromDir(dstDirWithDump) if err != nil { return errors.Wrap(err, "GetDbCollectionFromDir") } - for _, row := range dbColList { - if row.Db == "admin" { - continue - } - } - // 导入部分表时,要删除掉不需要的库和表文件. s.removeNsByFilter(dbColList, dstDirWithDump) @@ -174,6 +186,44 @@ func (s *restoreJob) doLogicalRestore() error { return nil } +// removeDbmSysNs +// 删除掉 admin/gcs.backup文件 +// 删除掉 test/dbmon_heartbeat文件 +// 删除掉 test/dbmon_heartbeat文件 +func (s *restoreJob) removeDbmSysNs(dstDir string) ([]string, error) { + var colList = [][2]string{ + {"admin", "gcs.backup"}, + {"test", "dbmon.heartbeat"}, + {"test", "dbmon_heartbeat"}, + } + var deletedCol []string + + for _, row := range colList { + db, col := row[0], row[1] + var toDelFileList []string + toDelFileList = append(toDelFileList, + fmt.Sprintf("%s/%s/%s.bson", dstDir, db, col), + fmt.Sprintf("%s/%s/%s.bson.gz", dstDir, db, col), + fmt.Sprintf("%s/%s/%s.metadata.json", dstDir, db, col), + ) + ndel := 0 + for _, file := range toDelFileList { + if !util.FileExists(file) { + continue + } + err := os.Remove(file) + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("Remove %s", file)) + } + ndel++ + } + if ndel > 0 { + deletedCol = append(deletedCol, fmt.Sprintf("%s.%s", db, col)) + } + } + return deletedCol, nil +} + func (s *restoreJob) removeNsByFilter(dbColList []logical.DbCollection, dstDir string) error { // 导入部分表时,要删除掉不需要的库和表文件. if !s.param.Args.IsPartial { @@ -197,7 +247,7 @@ func (s *restoreJob) removeNsByFilter(dbColList []logical.DbCollection, dstDir s continue } for _, col := range notMatchList { - var toDelFileList = []string{} + var toDelFileList []string toDelFileList = append(toDelFileList, fmt.Sprintf("%s/%s/%s.bson", dstDir, row.Db, col), fmt.Sprintf("%s/%s/%s.bson.gz", dstDir, row.Db, col), diff --git a/dbm-services/mongodb/db-tools/dbactuator/pkg/common/exporter_conf.go b/dbm-services/mongodb/db-tools/dbactuator/pkg/common/exporter_conf.go index c4492770ad..e8ce242adf 100644 --- a/dbm-services/mongodb/db-tools/dbactuator/pkg/common/exporter_conf.go +++ b/dbm-services/mongodb/db-tools/dbactuator/pkg/common/exporter_conf.go @@ -5,7 +5,6 @@ import ( "dbm-services/mongodb/db-tools/dbactuator/pkg/util" "encoding/json" "fmt" - "io/ioutil" "os" "path/filepath" ) @@ -30,7 +29,7 @@ func WriteExporterConfigFile(port int, data interface{}) (err error) { } confFile = getConfFileName(port) fileData, _ = json.Marshal(data) - err = ioutil.WriteFile(confFile, fileData, 0755) + err = os.WriteFile(confFile, fileData, 0755) if err != nil { return err } diff --git a/dbm-services/mongodb/db-tools/dbactuator/pkg/jobmanager/jobmanager.go b/dbm-services/mongodb/db-tools/dbactuator/pkg/jobmanager/jobmanager.go index 7d5490151e..bcea021b03 100644 --- a/dbm-services/mongodb/db-tools/dbactuator/pkg/jobmanager/jobmanager.go +++ b/dbm-services/mongodb/db-tools/dbactuator/pkg/jobmanager/jobmanager.go @@ -150,6 +150,7 @@ func (m *JobGenericManager) RegisterAtomJob() { atommongodb.NewMongoStartProcess, atommongodb.NewReplacePackage, atommongodb.NewMongoSetFCV, + atommongodb.NewHelloJob, } { m.atomJobMapper[f().Name()] = f } diff --git a/dbm-services/mongodb/db-tools/dbmon/cmd/mongojob/check_health_job.go b/dbm-services/mongodb/db-tools/dbmon/cmd/mongojob/check_health_job.go index 825026b93c..4955078a81 100644 --- a/dbm-services/mongodb/db-tools/dbmon/cmd/mongojob/check_health_job.go +++ b/dbm-services/mongodb/db-tools/dbmon/cmd/mongojob/check_health_job.go @@ -42,6 +42,7 @@ type CheckHealthJob struct { // NOCC:golint/naming(其他:设计如此) } const mongoBin = "/usr/local/mongodb/bin/mongo" +const startMongoScript = "/usr/local/mongodb/bin/start_mongo.sh" // Run 执行例行备份. 被cron对象调用 func (job *CheckHealthJob) Run() { @@ -94,9 +95,9 @@ func (job *CheckHealthJob) runOneServer(svrItem *config.ConfServerItem) { return } - // 不存在,尝试启动 - // 启动成功: 发送消息LoginSuccess - // 启动失败: 发送消息LoginFailed + // 进程不存在,尝试启动 + // 启动成功: 发送消息LoginSuccess + // 启动失败: 发送消息LoginFailed startMongo(svrItem.Port) err = checkService(loginTimeout, svrItem) if err == nil { @@ -161,8 +162,9 @@ func checkService(loginTimeout int, svrItem *config.ConfServerItem) error { } func startMongo(port int) error { - cmd := "/usr/local/mongodb/bin/start.sh" - _, err := DoCommandWithTimeout(60, cmd, fmt.Sprintf("%d", port)) + ret, err := DoCommandWithTimeout(60, startMongoScript, fmt.Sprintf("%d", port)) + mylog.Logger.Info(fmt.Sprintf("exec %s return err:%v", ret.Cmdline, err)) + if err != nil { return err } diff --git a/dbm-services/mongodb/db-tools/dbmon/cmd/root.go b/dbm-services/mongodb/db-tools/dbmon/cmd/root.go index 8ccefb9b01..ed1363cc16 100644 --- a/dbm-services/mongodb/db-tools/dbmon/cmd/root.go +++ b/dbm-services/mongodb/db-tools/dbmon/cmd/root.go @@ -95,16 +95,15 @@ Wait each job finish,the job result would write to local file, and other program entryID, err = c.AddJob(row.cron, cron.NewChain(cron.SkipIfStillRunning(mylog.AdapterLog)).Then(row.job)) if err != nil { - log.Panicf("addjob fail,jobName: %s entryID:%d,err:%v\n", row.name, entryID, err) + log.Panicf("addjob fail,jobName: %s entryID:%d,err:%v", row.name, entryID, err) return } - mylog.Logger.Info(fmt.Sprintf("AddJob %s, entryID:%d ", row.name, entryID)) mylog.Logger.Info("AddJob success", zapcore.Field{Key: "jobName", Type: zapcore.StringType, String: row.name}, zapcore.Field{Key: "entryID", Type: zapcore.Int64Type, Integer: int64(entryID)}, ) } - mylog.Logger.Info(fmt.Sprintf("start dbmon, Listen:%s\n", config.GlobalConf.HttpAddress)) + mylog.Logger.Info(fmt.Sprintf("start dbmon, Listen:%s", config.GlobalConf.HttpAddress)) c.Start() // go func() { // // for go tool pprof. curl http://127.0.0.1:6600/debug/pprof/heap diff --git a/dbm-services/mongodb/db-tools/dbmon/config/config.go b/dbm-services/mongodb/db-tools/dbmon/config/config.go index ec87085edf..ddfb953557 100644 --- a/dbm-services/mongodb/db-tools/dbmon/config/config.go +++ b/dbm-services/mongodb/db-tools/dbmon/config/config.go @@ -24,7 +24,7 @@ type BkDbmLabel struct { BkCloudID int64 `json:"bk_cloud_id" mapstructure:"bk_cloud_id" yaml:"bk_cloud_id"` BkBizID int `json:"bk_biz_id" mapstructure:"bk_biz_id" yaml:"bk_biz_id" yaml:"bk_biz_id"` App string `json:"app" mapstructure:"app" yaml:"app"` - AppName string `json:"app_name" mapstructure:"-" yaml:"app_name"` + AppName string `json:"app_name" mapstructure:"app_name" yaml:"app_name"` ClusterDomain string `json:"cluster_domain" mapstructure:"cluster_domain" yaml:"cluster_domain"` ClusterId int64 `json:"cluster_id" mapstructure:"cluster_id" yaml:"cluster_id"` ClusterName string `json:"cluster_name" mapstructure:"cluster_name" yaml:"cluster_name"` diff --git a/dbm-services/mongodb/db-tools/dbmon/mylog/mylog.go b/dbm-services/mongodb/db-tools/dbmon/mylog/mylog.go index 56dbced6ed..802f3147af 100644 --- a/dbm-services/mongodb/db-tools/dbmon/mylog/mylog.go +++ b/dbm-services/mongodb/db-tools/dbmon/mylog/mylog.go @@ -5,13 +5,10 @@ import ( "fmt" "log" "os" - "os/exec" "path/filepath" "strings" "time" - "dbm-services/mongodb/db-tools/dbmon/pkg/consts" - "github.com/robfig/cron/v3" "github.com/spf13/viper" "go.uber.org/zap" @@ -63,10 +60,6 @@ func InitRotateLoger() { logDir := filepath.Join(currDir, "logs") mkdirIfNotExistsWithPerm(logDir, 0750) - chownCmd := fmt.Sprintf("chown -R %s.%s %s", consts.MysqlAaccount, consts.MysqlGroup, logDir) - cmd := exec.Command("bash", "-c", chownCmd) - cmd.Run() - cfg := zap.NewProductionConfig() cfg.EncoderConfig = zapcore.EncoderConfig{ MessageKey: "msg", @@ -86,9 +79,9 @@ func InitRotateLoger() { lj := zapcore.AddSync(&lumberjack.Logger{ Filename: filepath.Join(logDir, "bk-dbmon.log"), - MaxSize: 256, // 单个日志文件大小,单位MB - MaxBackups: 10, // 最多保存10个文件 - MaxAge: 15, // 最多保存15天内的日志 + MaxSize: 64, // 单个日志文件大小,单位MB + MaxBackups: 7, // 最多保存10个文件 + MaxAge: 15, // 最多保存15天内的日志 LocalTime: true, Compress: true, }) diff --git a/dbm-services/mongodb/db-tools/dbmon/pkg/sendwarning/bkmonitorbeat.go b/dbm-services/mongodb/db-tools/dbmon/pkg/sendwarning/bkmonitorbeat.go index 79400e0fbb..b9ac307a24 100644 --- a/dbm-services/mongodb/db-tools/dbmon/pkg/sendwarning/bkmonitorbeat.go +++ b/dbm-services/mongodb/db-tools/dbmon/pkg/sendwarning/bkmonitorbeat.go @@ -100,8 +100,8 @@ func (bm *BkMonitorEventSender) SendTimeSeriesMsg(dataId int64, token string, ta "-report.message.kind", "timeseries", "-report.agent.address", bm.AgentAddress, "-report.message.body", string(tempBytes)) - mylog.Logger.Info(sendCmd.GetCmdLine("", false)) _, err = sendCmd.Run2(20 * time.Second) + mylog.Logger.Info(fmt.Sprintf("%s err: %v", sendCmd.GetCmdLine("", false), err)) return } diff --git a/dbm-services/mongodb/db-tools/mongo-toolkit-go/pkg/mycmd/mycmd.go b/dbm-services/mongodb/db-tools/mongo-toolkit-go/pkg/mycmd/mycmd.go index 378464910a..eb206479e5 100644 --- a/dbm-services/mongodb/db-tools/mongo-toolkit-go/pkg/mycmd/mycmd.go +++ b/dbm-services/mongodb/db-tools/mongo-toolkit-go/pkg/mycmd/mycmd.go @@ -15,21 +15,17 @@ import ( "github.com/pkg/errors" ) -type arg struct { - v string - isPwd bool -} - type Password string +type Val string -// CmdBuilder 用于生成给sh执行的命令行,支持标记密码参数,用于生成不带密码的命令行 +// CmdBuilder 用于生成给sh执行的命令行, 生成命令行时,Password和Val会添加单引号 type CmdBuilder struct { - Args []arg + Args []interface{} } // New NewCmdBuilder and append v func New(v ...interface{}) *CmdBuilder { - return NewCmdBuilder().AppendArg(v...) + return NewCmdBuilder().Append(v...) } // NewCmdBuilder New CmdBuilder @@ -38,51 +34,64 @@ func NewCmdBuilder() *CmdBuilder { return &c } -func (c *CmdBuilder) appendOne(v string, isPwd bool) *CmdBuilder { - c.Args = append(c.Args, arg{v, isPwd}) - return c -} - -// AppendArg Append interface arg -func (c *CmdBuilder) AppendArg(v ...interface{}) *CmdBuilder { - for _, vv := range v { - switch vv.(type) { - case Password: - _ = c.appendOne(string(vv.(Password)), true) - case string: - _ = c.appendOne(vv.(string), false) - default: - // 只接受string和Password类型。 不应该出现其他类型 - _ = c.appendOne(fmt.Sprintf("%v", vv), false) - } - - } +func (c *CmdBuilder) appendOne(v interface{}) *CmdBuilder { + c.Args = append(c.Args, v) return c } -// Append Append string arg -func (c *CmdBuilder) Append(v ...string) *CmdBuilder { +// Append string arg +func (c *CmdBuilder) Append(v ...interface{}) *CmdBuilder { for _, vv := range v { - _ = c.appendOne(vv, false) + c.appendOne(vv) } return c } // AppendPassword Append password arg func (c *CmdBuilder) AppendPassword(v string) *CmdBuilder { - return c.appendOne(v, true) + return c.appendOne(Password(v)) +} + +// argToString 生成命令行内容 +// @replacePassword 将密码替换成xxx +// @isCmdLine 生成cmdline给bash调用的,为Val, Password添加” +func argToString(v interface{}, replacePassword bool, isCmdLine bool) string { + switch v.(type) { + case string: + return fmt.Sprintf("%s", v) + case int, int8, int16, int32, int64: + return fmt.Sprintf("%d", v) + case Val: + if isCmdLine { + return fmt.Sprintf("'%s'", v) + } else { + return fmt.Sprintf("%s", v) + } + case Password: + if isCmdLine { + if replacePassword { + return "xxx" + } else { + return fmt.Sprintf(`'%s'`, v) + } + } else { + if replacePassword { + return "xxx" + } else { + return fmt.Sprintf(`%s`, v) + } + } + default: + panic(fmt.Sprintf("mycmd argToString bad type %T", v)) + } } // GetCmdLine Get cmd line with suUser // replacePassword 是否替换密码 func (c *CmdBuilder) GetCmdLine(suUser string, replacePassword bool) string { tmpSlice := make([]string, 0, len(c.Args)) - for _, argItem := range c.Args { - if replacePassword && argItem.isPwd { - tmpSlice = append(tmpSlice, "xxx") - } else { - tmpSlice = append(tmpSlice, argItem.v) - } + for _, v := range c.Args { + tmpSlice = append(tmpSlice, argToString(v, replacePassword, true)) } cmdLine := strings.Join(tmpSlice, " ") if suUser != "" { @@ -98,10 +107,10 @@ func (c *CmdBuilder) GetCmdLine2(replacePassword bool) string { // GetCmd Get cmd and args func (c *CmdBuilder) GetCmd() (bin string, args []string) { - bin = c.Args[0].v + bin = c.Args[0].(string) args = make([]string, 0, len(c.Args)-1) for _, argItem := range c.Args[1:] { - args = append(args, argItem.v) + args = append(args, argToString(argItem, false, false)) } return } @@ -164,7 +173,7 @@ func (c *CmdBuilder) Run3(timeout time.Duration, stdout, stderr io.Writer) (*Exe } -// RunBg run in background +// RunBackground run in background func (c *CmdBuilder) RunBackground(outputFileName string) (pid int, err error) { bin, args := c.GetCmd() cmd := exec.Command(bin, args...) diff --git a/dbm-services/mongodb/db-tools/mongo-toolkit-go/pkg/mycmd/mycmd_test.go b/dbm-services/mongodb/db-tools/mongo-toolkit-go/pkg/mycmd/mycmd_test.go index 7f632f920a..7b0f990d43 100644 --- a/dbm-services/mongodb/db-tools/mongo-toolkit-go/pkg/mycmd/mycmd_test.go +++ b/dbm-services/mongodb/db-tools/mongo-toolkit-go/pkg/mycmd/mycmd_test.go @@ -15,14 +15,18 @@ func TestCmd(t *testing.T) { } for _, v := range input { cb := NewCmdBuilder() - cb.Append(v.cmd).Append(v.args...) + for _, vv := range v.args { + cb.Append(v.cmd).Append(vv) + } + o, err := cb.Run2(5 * time.Second) + t.Logf("cmd %s stdout %q stderr %q err %v", + cb.GetCmdLine("", false), o.OutBuf.String(), o.ErrBuf.String(), err) if err != nil { t.Errorf("cmd %s err %v", cb.GetCmdLine("", false), err) continue } - t.Logf("cmd %s stdout %q stderr %q err %v", - cb.GetCmdLine("", false), o.OutBuf.String(), o.ErrBuf.String(), err) + } } diff --git a/dbm-services/mongodb/db-tools/mongo-toolkit-go/pkg/mymongo/mymongo.go b/dbm-services/mongodb/db-tools/mongo-toolkit-go/pkg/mymongo/mymongo.go index 2a301004bd..9af9ddf719 100644 --- a/dbm-services/mongodb/db-tools/mongo-toolkit-go/pkg/mymongo/mymongo.go +++ b/dbm-services/mongodb/db-tools/mongo-toolkit-go/pkg/mymongo/mymongo.go @@ -4,6 +4,7 @@ package mymongo import ( "context" "fmt" + "net/url" "time" "go.mongodb.org/mongo-driver/mongo" @@ -23,7 +24,7 @@ type MongoHost struct { // Connect return mongo client func Connect(host, port, user, pass, authdb string, timeout time.Duration) (*mongo.Client, error) { - mongoURI := fmt.Sprintf("mongodb://%s:%s@%s:%s/%s", user, pass, host, port, authdb) + mongoURI := fmt.Sprintf("mongodb://%s:%s@%s:%s/%s", user, url.QueryEscape(pass), host, port, authdb) ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() return mongo.Connect(ctx, options.Client().ApplyURI(mongoURI)) diff --git a/dbm-services/mongodb/db-tools/mongo-toolkit-go/pkg/report/backup_record.go b/dbm-services/mongodb/db-tools/mongo-toolkit-go/pkg/report/backup_record.go index 285eb63183..c0e0302467 100644 --- a/dbm-services/mongodb/db-tools/mongo-toolkit-go/pkg/report/backup_record.go +++ b/dbm-services/mongodb/db-tools/mongo-toolkit-go/pkg/report/backup_record.go @@ -9,10 +9,12 @@ import ( // 关联实例的维度信息 // 备份系统中的关联信息 TaskId,是否已完成 // 发起备份关联信息 -// 来自单据. 1. ReleateBillId, ReleateBillInfo:一个打包的Json +// 来自单据. 1. RelatedBillId, ReleateBillInfo:一个打包的Json // 来自日常备份: type BackupRecord struct { config.BkDbmLabel + ServerIp string `json:"server_ip"` // 值等于IP + ServerPort int `json:"server_port"` // 值等于Port ReportType string `json:"report_type"` // mongo_backup_result 日志平台 // File Info StartTime string `json:"start_time"` // 备份起始时间,格式为"2023-12-27T13:00:15+08:00" @@ -34,7 +36,7 @@ type BackupRecord struct { PitrBinlogIndex uint32 `json:"pitr_binlog_index"` // 增量备份的index,是一个递增数字 PitrLastPos uint32 `json:"pitr_last_pos"` // UnixTime. binlog_last_time. src == daily时有值 // if src == bill - ReleateBillId string `json:"releate_bill_id"` // + RelatedBillId string `json:"related_bill_id"` // related_bill_id TotalFileNum int `json:"total_file_num"` // MyFileNum int `json:"my_file_num"` // ReleateBillInfo string `json:"releate_bill_info"` // 关联的Bill的内容,是一个Json数据. @@ -61,6 +63,8 @@ func (b *BackupRecord) AppendMetaLabel(meta *config.BkDbmLabel) error { return nil } b.BkDbmLabel = *meta + b.ServerIp = b.BkDbmLabel.IP + b.ServerPort = b.BkDbmLabel.Port return nil } @@ -73,7 +77,7 @@ func (b *BackupRecord) AppendBsInfo(taskId, tag string) { // AppendBillSrc append bill src info func (b *BackupRecord) AppendBillSrc(billId, releateBillInfo string, totalFileNum, myFileIdx int) error { b.Src = "bill" - b.ReleateBillId = billId + b.RelatedBillId = billId b.ReleateBillInfo = releateBillInfo b.TotalFileNum = totalFileNum b.MyFileNum = myFileIdx diff --git a/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/logical/db_collection.go b/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/logical/db_collection.go index b9da8bb1b3..1454cdf50f 100644 --- a/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/logical/db_collection.go +++ b/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/logical/db_collection.go @@ -16,6 +16,8 @@ type DbCollection struct { notMachCol []string } +var ErrorNoMatchDb error = errors.New("NoMatchDb") + // GetDbCollectionWithFilter 获取指定mongo的所有db和collection func GetDbCollectionWithFilter(ip, port, user, pass, authDb string, filter *NsFilter) ([]DbCollection, error) { client, err := mymongo.Connect(ip, port, user, pass, authDb, 60*time.Second) @@ -32,7 +34,7 @@ func GetDbCollectionWithFilter(ip, port, user, pass, authDb string, filter *NsFi matchDbList, _ := filter.FilterDb(dbList) // 如果按照输入的db过滤后,没有db了,就报错 if len(matchDbList) == 0 { - return nil, errors.New("no match db") + return nil, ErrorNoMatchDb } var dbColList []DbCollection diff --git a/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/logical/dump.go b/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/logical/dump.go index 5e2bfb6c4f..b95d9d8799 100644 --- a/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/logical/dump.go +++ b/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/logical/dump.go @@ -59,7 +59,7 @@ func Dump(option *DumpOption) { filter := NewNsFilter(option.Args.PartialArgs.DbList, option.Args.PartialArgs.IgnoreDbList, option.Args.PartialArgs.ColList, option.Args.PartialArgs.IgnoreColList) - cmdLineList, cmdLine, err := helper.DumpPartial(tmpPath, "dump.log", filter) + cmdLineList, cmdLine, err, _ := helper.DumpPartial(tmpPath, "dump.log", filter) if err != nil { log.Errorf("exec cmd fail, cmd: %s, error:%s", cmdLine, err) return @@ -108,9 +108,9 @@ type MongoDumpHelper struct { } // NewMongoDumpHelper 逻辑备份 -func NewMongoDumpHelper(mongoHost *mymongo.MongoHost, dumpBin, user, pass, authDb string, osUser string) *MongoDumpHelper { +func NewMongoDumpHelper(host *mymongo.MongoHost, dumpBin, user, pass, authDb string, osUser string) *MongoDumpHelper { return &MongoDumpHelper{ - MongoHost: mongoHost, + MongoHost: host, MongoDumpBin: dumpBin, User: user, Pass: pass, @@ -130,7 +130,8 @@ func NewMongoDumpHelper(mongoHost *mymongo.MongoHost, dumpBin, user, pass, authD // 2. 备份多个表 : --excludeCollection tableName1 --excludeCollection tableName2 ... // DumpPartial 逻辑备份 指定库表 -func (m *MongoDumpHelper) DumpPartial(outDir string, logFileName string, filter *NsFilter) (cmdLineList []string, cmdLine string, err error) { +func (m *MongoDumpHelper) DumpPartial(outDir string, logFileName string, filter *NsFilter) ( + cmdLineList []string, cmdLine string, err error, nCol int) { // 如果filter为nil,请使用LogicalDumpAll if filter == nil { panic("filter is nil") @@ -141,10 +142,7 @@ func (m *MongoDumpHelper) DumpPartial(outDir string, logFileName string, filter err = errors.Wrap(err, "GetDbCollectionWithFilter") return } - if dbColList == nil { - err = errors.New("no match database or collection") - return - } + fmt.Printf("debug DumpPartial dbColList: %+v\n", dbColList) for _, dbRow := range dbColList { // 没有匹配的表,就不备份 @@ -161,17 +159,19 @@ func (m *MongoDumpHelper) DumpPartial(outDir string, logFileName string, filter func (m *MongoDumpHelper) dumpDbCol(outDir string, logFileName string, dbName string, colList []string, excludeColList []string) (cmdLine string, err error) { - dumpCmd := mycmd.New(m.MongoDumpBin, "-u", m.User, + dumpCmd := mycmd.New(m.MongoDumpBin, + "-u", m.User, "-p", mycmd.Password(m.Pass), - "--host", m.MongoHost.Host, "--port", m.MongoHost.Port, - fmt.Sprintf("--authenticationDatabase=%s", m.AuthDb), + "--host", mycmd.Val(m.MongoHost.Host), + "--port", m.MongoHost.Port, + "--authenticationDatabase="+m.AuthDb, "-d", dbName) if len(colList) == 1 { - dumpCmd.Append("--collection", colList[0]) + dumpCmd.Append("--collection", mycmd.Val(colList[0])) } else if len(excludeColList) > 0 { for _, col := range excludeColList { - dumpCmd.Append("--excludeCollection", col) + dumpCmd.Append("--excludeCollection", mycmd.Val(col)) } } @@ -186,7 +186,7 @@ func (m *MongoDumpHelper) dumpDbCol(outDir string, logFileName string, func (m *MongoDumpHelper) LogicalDumpAll(outDir string, logFileName string) (cmdLine string, err error) { // dumpCmd := mycmd.New(m.MongoDumpBin, "-u", m.User, "-p", mycmd.Password(m.Pass), - "--host", m.MongoHost.Host, "--port", m.MongoHost.Port, + "--host", mycmd.Val(m.MongoHost.Host), "--port", m.MongoHost.Port, fmt.Sprintf("--authenticationDatabase=%s", m.AuthDb), "-o", outDir, ">", path.Join(outDir, logFileName), "2>&1") _, _, _, err = dumpCmd.RunByBash(m.OsUser, time.Hour*24) diff --git a/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/backup.go b/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/backup.go index 559bf41285..30e0f66e5a 100644 --- a/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/backup.go +++ b/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/backup.go @@ -445,7 +445,7 @@ func buildDumpIncrCmd(connInfo *mymongo.MongoHost, zip bool, lastBackup *BackupF dumpCmd.Append("-u", connInfo.User) } if len(connInfo.Pass) > 0 { - dumpCmd.Append("-p").AppendPassword(connInfo.Pass) + dumpCmd.Append("-p", mycmd.Password(connInfo.Pass)) } if zip { dumpCmd.Append("--gzip") @@ -469,7 +469,8 @@ func buildDumpIncrCmd(connInfo *mymongo.MongoHost, zip bool, lastBackup *BackupF } func buildDumpFullCmd(connInfo *mymongo.MongoHost, zip bool, lastBackup *BackupFileName, maxTs *TS) (*mycmd.CmdBuilder, error) { - // ./mongotools/mongodump.2.4 mongodump.3.0 mongodump.3.2 mongodump.3.4 mongodump.3.6 mongodump.4.0 mongodump.4.2 + // ./mongotools/mongodump.2.4 mongodump.3.0 mongodump.3.2 mongodump.3.4 + // mongodump.3.6 mongodump.4.0 mongodump.4.2 version, err := GetVersion(connInfo) if err != nil { return nil, errors.Wrap(err, "get version") @@ -492,7 +493,7 @@ func buildDumpFullCmd(connInfo *mymongo.MongoHost, zip bool, lastBackup *BackupF dumpCmd.Append("-u", connInfo.User) } if len(connInfo.Pass) > 0 { - dumpCmd.Append("-p").AppendPassword(connInfo.Pass) + dumpCmd.Append("-p", mycmd.Password(connInfo.Pass)) } if zip { dumpCmd.Append("--gzip") diff --git a/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/job.go b/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/job.go index 911944104b..6b3923d63c 100644 --- a/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/job.go +++ b/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/job.go @@ -5,6 +5,8 @@ import ( "dbm-services/mongodb/db-tools/mongo-toolkit-go/pkg/backupsys" "dbm-services/mongodb/db-tools/mongo-toolkit-go/pkg/mymongo" "dbm-services/mongodb/db-tools/mongo-toolkit-go/pkg/report" + "os" + "path/filepath" "time" log "github.com/sirupsen/logrus" @@ -97,6 +99,7 @@ func backupIncrForPrevFull(option *BackupOption, bm *BackupMetaV2, } func uploadFileAndAppendToReportFile(option *BackupOption, result *BackupFileName) { + task, err := sendToBackupSystem(option, result) if err == nil { log.Infof("sendToBackupSystem file: %s, taskid: %s err:%v", task.FilePath, task.TaskId, err) @@ -164,6 +167,24 @@ func canSkip(option *BackupOption, lastFull, lastIncr *BackupFileName, now time. return isSkip } +// setOtherReadPerm 设置文件的perm 0744,父目录的perm 为0755 +func setOtherReadPerm(filePath string) error { + var err error + if err = os.Chmod(filePath, 0744); err != nil { + return err + } + + dirName := filepath.Dir(filePath) + if dirName == "/" { + return nil + } + + if err = os.Chmod(filePath, 0755); err != nil { + return err + } + return nil +} + func sendToBackupSystem(option *BackupOption, backupRec *BackupFileName) (task *backupsys.TaskInfo, err error) { if !option.SendToBackupSystem { task = &backupsys.TaskInfo{ @@ -177,6 +198,7 @@ func sendToBackupSystem(option *BackupOption, backupRec *BackupFileName) (task * } else { fileTag = option.IncrTag } + setOtherReadPerm(backupRec.FileName) if task, err = backupsys.UploadFile(backupRec.FileName, fileTag); err != nil { log.Warnf("UploadFile failed %v", err) return diff --git a/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/metav2.go b/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/metav2.go index 888758c986..2d7a84dc0c 100644 --- a/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/metav2.go +++ b/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/metav2.go @@ -72,7 +72,7 @@ type deleteReason struct { } func (d deleteReason) String() string { - return fmt.Sprintf("%s:%t", d.Reason, d.Flag) + return fmt.Sprintf("%s:%d", d.Reason, d.Flag) } // RemoveOldFileFirst 删除过期的备份文件 diff --git a/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/recover.go b/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/recover.go index f2214b6106..757f314459 100644 --- a/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/recover.go +++ b/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/recover.go @@ -175,15 +175,15 @@ func DoMongoRestoreFULL(bin string, conn *mymongo.MongoHost, file *BackupFileNam restoreCmd := mycmd.New(bin, "--host", conn.Host, "--port", conn.Port, "--authenticationDatabase", conn.AuthDb, "--oplogReplay") if len(conn.User) > 0 { - restoreCmd.AppendArg("-u", conn.User) + restoreCmd.Append("-u", conn.User) } if len(conn.Pass) > 0 { - restoreCmd.AppendArg("-p", mycmd.Password(conn.Pass)) + restoreCmd.Append("-p", mycmd.Password(conn.Pass)) } if gzip { - restoreCmd.AppendArg("--gzip") + restoreCmd.Append("--gzip") } - restoreCmd.AppendArg("--dir", dumpDir) + restoreCmd.Append("--dir", mycmd.Val(dumpDir)) errReader, errWriter := io.Pipe() var outBuf, errBuf bytes.Buffer bgCmd := mycmd.NewExecCmdBg() diff --git a/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/recover_helper.go b/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/recover_helper.go index a065cd49e3..bdda43fe16 100644 --- a/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/recover_helper.go +++ b/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/recover_helper.go @@ -7,15 +7,22 @@ import ( "github.com/pkg/errors" ) -// ParseTimeStr 解析时间字符串 TODO: 改为RFC3339格式 +// ParseTimeStr 解析时间字符串 +// timeStr 合法格式为 "2006-01-02T15:04:05[+=08:00]" 默认为Asia/Chongqing func ParseTimeStr(timeStr string) (uint32, error) { var recoverTime uint32 loc, _ := time.LoadLocation("Asia/Chongqing") - if tv, err := time.ParseInLocation("2006-01-02T15:04:05", timeStr, loc); err == nil { + tv, err := time.ParseInLocation("2006-01-02T15:04:05", timeStr, loc) + if err == nil { + recoverTime = uint32(tv.Unix()) + return recoverTime, nil + } + tv, err = time.Parse(time.RFC3339, timeStr) + if err == nil { recoverTime = uint32(tv.Unix()) return recoverTime, nil } else { - return 0, errors.Wrap(err, "time.ParseInLocation") + return 0, errors.Wrap(err, "ParseTimeStr") } } diff --git a/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/recover_helper_test.go b/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/recover_helper_test.go new file mode 100644 index 0000000000..ba357e9fd3 --- /dev/null +++ b/dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/pitr/recover_helper_test.go @@ -0,0 +1,26 @@ +package pitr + +import ( + "github.com/pkg/errors" + "testing" +) + +func TestParseTimeStr(t *testing.T) { + var tests = []struct { + input string + want error + }{ + {"2024-07-17T23:00:00+08:00", nil}, + {"2024-07-17T23:00:00-07:00", nil}, + {"2024-07-17T23:00:00Z07:00", errors.New("ParseTimeStr")}, + {"2024-07-17T23:00:00", nil}, + } + + for _, v := range tests { + if out, err := ParseTimeStr(v.input); !errors.Is(err, v.want) { + t.Errorf("ERR ParseTimeStr (%q) return out:%v err:(%v)", v.input, out, err) + } else { + t.Logf("OK ParseTimeStr (%q) return out:%v err:(%v)", v.input, out, err) + } + } +} diff --git a/dbm-services/mysql/db-remote-service/pkg/redis_rpc/common.go b/dbm-services/mysql/db-remote-service/pkg/redis_rpc/common.go index cd5d9a5586..d05cbd388b 100644 --- a/dbm-services/mysql/db-remote-service/pkg/redis_rpc/common.go +++ b/dbm-services/mysql/db-remote-service/pkg/redis_rpc/common.go @@ -31,6 +31,10 @@ type RedisQueryParams struct { DbNum int `json:"db_num"` Password string `json:"password"` Command string `json:"command"` + // ClientType webconsole or 其它任何值。 当这个值为webconsole时,调用redis-cli执行命令. + ClientType string `json:"client_type,omitempty"` + // Raw 只对ClientType为webconsole时生效. 为redis-cli添加--raw参数 raw模式下,能返回中文. 默认为false + Raw bool `json:"raw,omitempty"` } // StringWithoutPasswd 打印参数,不打印密码 @@ -77,7 +81,7 @@ func FormatName(msg string) (string, error) { return stringMsg, nil } -// DoRedisCmd 执行redis命令 +// DoRedisCmd 执行redis命令 by redis-cli func DoRedisCmd(address, redisPass, cmdline, dbNum string, raw bool) (string, error) { splitList := strings.Split(address, ":") redisHost, redisPort := splitList[0], splitList[1] @@ -126,7 +130,7 @@ func DoRedisCmd(address, redisPass, cmdline, dbNum string, raw bool) (string, er return string(out), nil } -// DoRedisCmdNew 执行redis命令 +// DoRedisCmdNew 执行redis命令 by go driver func DoRedisCmdNew(address, redisPass, cmd string, dbNum int) (ret string, err error) { var strBuilder strings.Builder cli, err := NewRedisClientWithTimeout(address, redisPass, dbNum, time.Second*2) diff --git a/dbm-services/mysql/db-remote-service/pkg/redis_rpc/init.go b/dbm-services/mysql/db-remote-service/pkg/redis_rpc/init.go index 444ec5166d..759c7f16f7 100644 --- a/dbm-services/mysql/db-remote-service/pkg/redis_rpc/init.go +++ b/dbm-services/mysql/db-remote-service/pkg/redis_rpc/init.go @@ -530,5 +530,7 @@ func init() { KeyStep: 0} RedisCommandTable["failover"] = &RedisCmdMeta{Name: "failover", Arity: -1, Sflags: adminFlag, FirstKey: 0, LastKey: 0, KeyStep: 0} + RedisCommandTable["select"] = &RedisCmdMeta{Name: "select", Arity: 1, Sflags: readOnlyFlag, FirstKey: 0, LastKey: 0, + KeyStep: 0} }) } diff --git a/dbm-services/mysql/db-remote-service/pkg/redis_rpc/redis_rpc.go b/dbm-services/mysql/db-remote-service/pkg/redis_rpc/redis_rpc.go index 5c6e068e29..411033835b 100644 --- a/dbm-services/mysql/db-remote-service/pkg/redis_rpc/redis_rpc.go +++ b/dbm-services/mysql/db-remote-service/pkg/redis_rpc/redis_rpc.go @@ -4,6 +4,7 @@ package redis_rpc import ( "fmt" "log/slog" + "strconv" "strings" "github.com/gin-gonic/gin" @@ -13,11 +14,27 @@ import ( type RedisRPCEmbed struct { } +// WebConsoleMode webconsole mode, using redis-cli to execute command +const WebConsoleMode = "webconsole" + // NewRedisRPCEmbed TODO func NewRedisRPCEmbed() *RedisRPCEmbed { return &RedisRPCEmbed{} } +// IsAdminCommand 是否为admin类的指令 +// 也许应该放开cluster nodes, info 之类. +func (r *RedisRPCEmbed) IsAdminCommand(cmdArgs []string) bool { + if len(cmdArgs) == 0 { + return false + } + cmd := strings.ToLower(cmdArgs[0]) + if _, ok := RedisCommandTable[cmd]; !ok { + return false + } + return strings.Contains(RedisCommandTable[cmd].Sflags, adminFlag) +} + // IsQueryCommand redis 解析命令 func (r *RedisRPCEmbed) IsQueryCommand(cmdArgs []string) bool { if len(cmdArgs) == 0 { @@ -118,8 +135,20 @@ func (r *RedisRPCEmbed) DoCommand(c *gin.Context) { return } - // ret, err := DoRedisCmd(address, password, formatCmd, strconv.Itoa(param.DbNum), true) - ret, err := DoRedisCmdNew(address, password, formatCmd, param.DbNum) + var ret string + // webConsole模式,不支持管理类命令 + if param.ClientType == WebConsoleMode { + if r.IsAdminCommand(cmdArgs) { + slog.Error("RedisRPCEmbed is admin command, not support", slog.String("command", formatCmd)) + SendResponse(c, 1, fmt.Sprintf("non-support admin command:'%s'", formatCmd), nil) + return + } + + ret, err = DoRedisCmd(address, password, formatCmd, strconv.Itoa(param.DbNum), param.Raw) + } else { + ret, err = DoRedisCmdNew(address, password, formatCmd, param.DbNum) + } + if err != nil { slog.Error("RedisRPCEmbed execute command", err, slog.String("address", address),