Skip to content

Commit

Permalink
feat(mongodb): create mongodb pwfile #5429
Browse files Browse the repository at this point in the history
  • Loading branch information
cycker committed Sep 10, 2024
2 parents c94e463 + ba17a23 commit 28f3374
Show file tree
Hide file tree
Showing 22 changed files with 275 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"dbm-services/mongodb/db-tools/mongo-toolkit-go/toolkit/logical"
"encoding/json"
"fmt"
"os"
"path"
"time"

Expand All @@ -31,7 +32,6 @@ const backupTypePhysical string = "physical" // 未实现

// backupParams 备份任务参数,由前端传入
type backupParams struct {
// 这个参数是不是可以从bk-dbmon.conf中获得?
BkDbmInstance config.BkDbmLabel `json:"bk_dbm_instance"`
IP string `json:"ip"`
Port int `json:"port"`
Expand Down Expand Up @@ -136,15 +136,19 @@ func (s *backupJob) doLogicalBackup() error {
partialArgs.DbList, partialArgs.IgnoreDbList,
partialArgs.ColList, partialArgs.IgnoreColList)

cmdLineList, cmdLine, err := helper.DumpPartial(tmpPath, "dump.log", filter)
cmdLineList, cmdLine, err, _ := helper.DumpPartial(tmpPath, "dump.log", filter)

if err != nil {
s.runtime.Logger.Error("exec cmd fail, cmd: %s, error:%s", cmdLine, err)
return errors.Wrap(err, "LogicalDumpPartial")
if errors.Is(err, logical.ErrorNoMatchDb) {
s.runtime.Logger.Warn("NoMatchDb")
return nil
} else {
s.runtime.Logger.Error("exec cmd fail, cmd: %s, error:%s", cmdLine, err)
return errors.Wrap(err, "LogicalDumpPartial")
}
}
s.runtime.Logger.Info("exec cmd success, cmd: %+v", cmdLineList)
} else {
// backupType = "dumpAll"
cmdLine, err := helper.LogicalDumpAll(tmpPath, "dump.log")
if err != nil {
s.runtime.Logger.Error("exec cmd fail, cmd: %s, error:%s", cmdLine, err)
Expand Down Expand Up @@ -180,6 +184,13 @@ func (s *backupJob) doLogicalBackup() error {
endTime = time.Now()
fSize, _ := util.GetFileSize(tarPath)
s.runtime.Logger.Info("backup file: %s size: %d", tarPath, fSize)

err = os.Chmod(tarPath, 0744)
if err != nil {
s.runtime.Logger.Error("chmod 0744 %s, err:%v", tarPath, err)
return errors.Wrap(err, "chmod")
}

// 上报备份记录。
task, err := backupsys.UploadFile(tarPath, s.ConfParams.BsTag)
// 如果此处失败,任务失败。
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package atommongodb

import (
"dbm-services/mongodb/db-tools/dbactuator/pkg/jobruntime"
"encoding/json"
"github.com/pkg/errors"
)

// hello 总是返回成功. 用于测试流程

// helloParams 备份任务参数,由前端传入
type helloParams struct {
IP string `json:"ip"`
Port int `json:"port"`
AdminUsername string `json:"adminUsername"`
AdminPassword string `json:"adminPassword"`
}

type helloJob struct {
BaseJob
ConfParams *helloParams
}

func (s *helloJob) Param() string {
o, _ := json.MarshalIndent(backupParams{}, "", "\t")
return string(o)
}

// NewHelloJob 实例化结构体
func NewHelloJob() jobruntime.JobRunner {
return &helloJob{}
}

// Name 获取原子任务的名字
func (s *helloJob) Name() string {
return "mongodb_hello"
}

// Run 运行原子任务
func (s *helloJob) Run() error {
s.runtime.Logger.Info("Run")
return nil
}

// Retry 重试
func (s *helloJob) Retry() uint {
// do nothing
return 2
}

// Rollback 回滚
func (s *helloJob) Rollback() error {
return nil
}

// Init 初始化
func (s *helloJob) Init(runtime *jobruntime.JobGenericRuntime) error {
// 获取安装参数
s.runtime = runtime
s.OsUser = "" // 备份进程,不再需要sudo,请以普通用户执行
if checkIsRootUser() {
s.runtime.Logger.Error("This job cannot be executed as root user")
return errors.New("This job cannot be executed as root user")
}
if err := json.Unmarshal([]byte(s.runtime.PayloadDecoded), &s.ConfParams); err != nil {
tmpErr := errors.Wrap(err, "payload json.Unmarshal failed")
s.runtime.Logger.Error(tmpErr.Error())
return tmpErr
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (job *installDbmonJob) updateConfigFile() error {
for fieldName, same := range eq {
if !same {
ndiff++
job.runtime.Logger.Info("config file %s %s has been changed", configFile, fieldName)
job.runtime.Logger.Info("config file %s %q has been changed", configFile, fieldName)
}
}
if ndiff == 0 {
Expand All @@ -220,8 +220,6 @@ func (job *installDbmonJob) updateConfigFile() error {
err = cpfile(srcFile, dstFile)
if err != nil {
job.runtime.Logger.Warn("cpfile %s to %s failed, err:%v", srcFile, dstFile, err)
} else {
job.runtime.Logger.Info("cpfile %s to %s success", srcFile, dstFile)
}
}
}
Expand Down Expand Up @@ -261,18 +259,20 @@ func (job *installDbmonJob) updateToolKit() error {
prevFile := path.Join(consts.PackageCachePath, fileName)
newFile := path.Join(consts.PackageSavePath, fileName)
dstDir := "/home/mysql/dbtools/mg/"
dstFile := path.Join(dstDir, fileName)
util.MkDirsIfNotExists([]string{dstDir})
skipped, err := untarMedia(prevFile, newFile, dstDir)
if err != nil {
return errors.Wrap(err, "updateDbTool")
return errors.Wrap(err, "updateToolKit")
}
if skipped {
job.runtime.Logger.Info(
"updateDbTool %s to %s skipped. Because the MD5 value of the file has not changed.",
"updateToolKit %s to %s skipped. Because the MD5 value of the file has not changed.",
newFile, dstDir)
return nil
}
job.runtime.Logger.Info("updateDbTool %s to %s done", newFile, dstDir)
os.Chmod(dstFile, 0755)
job.runtime.Logger.Info("updateToolKit %s to %s done", newFile, dstDir)
cpfile(newFile, prevFile) // 只在成功untar后才能cp
return nil
}
Expand Down Expand Up @@ -308,7 +308,7 @@ func (job *installDbmonJob) startDbmon() error {
job.updateDbmonFlag = true // always restart bk-dbmon

if isRunning {
job.runtime.Logger.Info("bk-dbmon is running, and md5 is changed. no need to restart. ")
job.runtime.Logger.Info("bk-dbmon is running, and md5 is changed, restart dbmon")
if err := exec.Command("kill", "-9", strconv.Itoa(pid)).Run(); err != nil {
return errors.Wrap(err, "kill -9")
} else {
Expand Down Expand Up @@ -367,7 +367,6 @@ func untarMedia(prevFile, newFile, dstDir string) (skipped bool, err error) {
}
}

// 是否要将output打印出来呢?不了吧,太多了.
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type removeNsJob struct {
MongoInst *mymongo.MongoHost
MongoClient *mongo.Client
tmp struct {
Err error
NsList []logical.DbCollection
NsIndex map[string][]*mongo.IndexSpecification
}
Expand Down Expand Up @@ -105,6 +106,12 @@ func connectPrimary(host *mymongo.MongoHost) (client *mongo.Client, err error) {
}

func (s *removeNsJob) dropCollection() (err error) {

if s.tmp.Err != nil && errors.Is(s.tmp.Err, logical.ErrorNoMatchDb) {
s.runtime.Logger.Info("no matched database and collection.")
return nil
}

err = s.backupIndex()
if err != nil {
return err
Expand Down Expand Up @@ -285,7 +292,15 @@ func (s *removeNsJob) getNsList() (err error) {

dbColList, err := logical.GetDbCollectionWithFilter(s.MongoInst.Host, s.MongoInst.Port,
s.MongoInst.User, s.MongoInst.Pass, s.MongoInst.AuthDb, filter)

s.runtime.Logger.Info(fmt.Sprintf("GetDbCollectionWithFilter: return %+v %v", dbColList, err))

if err != nil {
// 如果没有匹配的库表,算作成功. 返回Error给后面的流程处理.
if errors.Is(err, logical.ErrorNoMatchDb) {
s.tmp.Err = err
return nil
}
return errors.Wrap(err, "GetDbCollectionWithFilter")
}
// skip sys db
Expand All @@ -297,10 +312,9 @@ func (s *removeNsJob) getNsList() (err error) {
}

if len(s.tmp.NsList) == 0 {
return errors.Errorf("no matched db and col found")
s.tmp.Err = logical.ErrorNoMatchDb
}
s.runtime.Logger.Info(fmt.Sprintf("getNsList:%+v", s.tmp.NsList))

return nil
} else {
s.tmp.NsList, err = logical.GetDbCollection(s.MongoInst.Host, s.MongoInst.Port,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (m *JobGenericManager) RegisterAtomJob() {
atommongodb.NewMongoStartProcess,
atommongodb.NewReplacePackage,
atommongodb.NewMongoSetFCV,
atommongodb.NewHelloJob,
} {
m.atomJobMapper[f().Name()] = f
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type CheckHealthJob struct { // NOCC:golint/naming(其他:设计如此)
}

const mongoBin = "/usr/local/mongodb/bin/mongo"
const startMongoScript = "/usr/local/mongodb/bin/start_mongo.sh"

// Run 执行例行备份. 被cron对象调用
func (job *CheckHealthJob) Run() {
Expand Down Expand Up @@ -94,9 +95,9 @@ func (job *CheckHealthJob) runOneServer(svrItem *config.ConfServerItem) {
return
}

// 不存在,尝试启动
// 启动成功: 发送消息LoginSuccess
// 启动失败: 发送消息LoginFailed
// 进程不存在,尝试启动
// 启动成功: 发送消息LoginSuccess
// 启动失败: 发送消息LoginFailed
startMongo(svrItem.Port)
err = checkService(loginTimeout, svrItem)
if err == nil {
Expand Down Expand Up @@ -161,8 +162,9 @@ func checkService(loginTimeout int, svrItem *config.ConfServerItem) error {
}

func startMongo(port int) error {
cmd := "/usr/local/mongodb/bin/start.sh"
_, err := DoCommandWithTimeout(60, cmd, fmt.Sprintf("%d", port))
ret, err := DoCommandWithTimeout(60, startMongoScript, fmt.Sprintf("%d", port))
mylog.Logger.Info(fmt.Sprintf("exec %s return err:%v", ret.Cmdline, err))

if err != nil {
return err
}
Expand Down
5 changes: 2 additions & 3 deletions dbm-services/mongodb/db-tools/dbmon/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,15 @@ Wait each job finish,the job result would write to local file, and other program
entryID, err = c.AddJob(row.cron,
cron.NewChain(cron.SkipIfStillRunning(mylog.AdapterLog)).Then(row.job))
if err != nil {
log.Panicf("addjob fail,jobName: %s entryID:%d,err:%v\n", row.name, entryID, err)
log.Panicf("addjob fail,jobName: %s entryID:%d,err:%v", row.name, entryID, err)
return
}
mylog.Logger.Info(fmt.Sprintf("AddJob %s, entryID:%d ", row.name, entryID))
mylog.Logger.Info("AddJob success",
zapcore.Field{Key: "jobName", Type: zapcore.StringType, String: row.name},
zapcore.Field{Key: "entryID", Type: zapcore.Int64Type, Integer: int64(entryID)},
)
}
mylog.Logger.Info(fmt.Sprintf("start dbmon, Listen:%s\n", config.GlobalConf.HttpAddress))
mylog.Logger.Info(fmt.Sprintf("start dbmon, Listen:%s", config.GlobalConf.HttpAddress))
c.Start()
// go func() {
// // for go tool pprof. curl http://127.0.0.1:6600/debug/pprof/heap
Expand Down
2 changes: 1 addition & 1 deletion dbm-services/mongodb/db-tools/dbmon/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type BkDbmLabel struct {
BkCloudID int64 `json:"bk_cloud_id" mapstructure:"bk_cloud_id" yaml:"bk_cloud_id"`
BkBizID int `json:"bk_biz_id" mapstructure:"bk_biz_id" yaml:"bk_biz_id" yaml:"bk_biz_id"`
App string `json:"app" mapstructure:"app" yaml:"app"`
AppName string `json:"app_name" mapstructure:"-" yaml:"app_name"`
AppName string `json:"app_name" mapstructure:"app_name" yaml:"app_name"`
ClusterDomain string `json:"cluster_domain" mapstructure:"cluster_domain" yaml:"cluster_domain"`
ClusterId int64 `json:"cluster_id" mapstructure:"cluster_id" yaml:"cluster_id"`
ClusterName string `json:"cluster_name" mapstructure:"cluster_name" yaml:"cluster_name"`
Expand Down
13 changes: 3 additions & 10 deletions dbm-services/mongodb/db-tools/dbmon/mylog/mylog.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@ import (
"fmt"
"log"
"os"
"os/exec"
"path/filepath"
"strings"
"time"

"dbm-services/mongodb/db-tools/dbmon/pkg/consts"

"github.com/robfig/cron/v3"
"github.com/spf13/viper"
"go.uber.org/zap"
Expand Down Expand Up @@ -63,10 +60,6 @@ func InitRotateLoger() {
logDir := filepath.Join(currDir, "logs")
mkdirIfNotExistsWithPerm(logDir, 0750)

chownCmd := fmt.Sprintf("chown -R %s.%s %s", consts.MysqlAaccount, consts.MysqlGroup, logDir)
cmd := exec.Command("bash", "-c", chownCmd)
cmd.Run()

cfg := zap.NewProductionConfig()
cfg.EncoderConfig = zapcore.EncoderConfig{
MessageKey: "msg",
Expand All @@ -86,9 +79,9 @@ func InitRotateLoger() {

lj := zapcore.AddSync(&lumberjack.Logger{
Filename: filepath.Join(logDir, "bk-dbmon.log"),
MaxSize: 256, // 单个日志文件大小,单位MB
MaxBackups: 10, // 最多保存10个文件
MaxAge: 15, // 最多保存15天内的日志
MaxSize: 64, // 单个日志文件大小,单位MB
MaxBackups: 7, // 最多保存10个文件
MaxAge: 15, // 最多保存15天内的日志
LocalTime: true,
Compress: true,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ func (bm *BkMonitorEventSender) SendTimeSeriesMsg(dataId int64, token string, ta
"-report.message.kind", "timeseries",
"-report.agent.address", bm.AgentAddress,
"-report.message.body", string(tempBytes))
mylog.Logger.Info(sendCmd.GetCmdLine("", false))
_, err = sendCmd.Run2(20 * time.Second)
mylog.Logger.Info(fmt.Sprintf("%s err: %v", sendCmd.GetCmdLine("", false), err))
return
}

Expand Down
Loading

0 comments on commit 28f3374

Please sign in to comment.