From 0859c2869b9c493bbc9b2b52a62dc7440a6ec7fc Mon Sep 17 00:00:00 2001 From: xiepaup Date: Thu, 29 Aug 2024 14:44:08 +0800 Subject: [PATCH] feat(mongodb): mongos DBHA support #6419 --- .../common/dbha/ha-module/config/config.go | 7 + .../dbha/ha-module/constvar/constant.go | 6 + .../dbmodule/mongodb/mongos_callback.go | 156 +++++++++++++++ .../dbmodule/mongodb/mongos_detect.go | 183 ++++++++++++++++++ .../dbmodule/mongodb/mongos_switch.go | 125 ++++++++++++ .../dbha/ha-module/dbmodule/register.go | 8 + 6 files changed, 485 insertions(+) create mode 100644 dbm-services/common/dbha/ha-module/dbmodule/mongodb/mongos_callback.go create mode 100644 dbm-services/common/dbha/ha-module/dbmodule/mongodb/mongos_detect.go create mode 100644 dbm-services/common/dbha/ha-module/dbmodule/mongodb/mongos_switch.go diff --git a/dbm-services/common/dbha/ha-module/config/config.go b/dbm-services/common/dbha/ha-module/config/config.go index 1c69e8151f..13167dfefa 100644 --- a/dbm-services/common/dbha/ha-module/config/config.go +++ b/dbm-services/common/dbha/ha-module/config/config.go @@ -128,6 +128,8 @@ type DBConfig struct { Riak RiakConfig `yaml:"riak"` // Sqlserver instance detect info Sqlserver SqlserverConfig `yaml:"sqlserver"` + // MongoDB mongo config. + MongoDB MongoConfig `yaml:"mongodb"` } // MySQLConfig mysql instance connect info @@ -156,6 +158,11 @@ type SqlserverConfig struct { Timeout int `yaml:"timeout"` } +// MongoConfig mongo +type MongoConfig struct { + Timeout int `yaml:"timeout"` +} + // SSHConfig ssh detect configure type SSHConfig struct { Port int `yaml:"port"` diff --git a/dbm-services/common/dbha/ha-module/constvar/constant.go b/dbm-services/common/dbha/ha-module/constvar/constant.go index 4b1f41ba00..b4e2aab7d9 100644 --- a/dbm-services/common/dbha/ha-module/constvar/constant.go +++ b/dbm-services/common/dbha/ha-module/constvar/constant.go @@ -61,6 +61,9 @@ const ( TendisplusMetaType = "tendisplus" // SqlserverMetatype storage layer type name in SqlserverHa SqlserverMetatype = "sqlserver_ha" + + // Mongos MONGOS = EnumField("mongos", _("mongos")) # mongos + Mongos = "mongos" ) // instance role in cmdb @@ -108,6 +111,9 @@ const ( Riak = "riak" // SqlserverHA TODO SqlserverHA = "sqlserver_ha" + + // MongoShardedCluster = EnumField("MongoShardedCluster", _("Mongo分片集群")) + MongoShardCluster = "MongoShardedCluster" ) // wrapper name in TenDBCluster diff --git a/dbm-services/common/dbha/ha-module/dbmodule/mongodb/mongos_callback.go b/dbm-services/common/dbha/ha-module/dbmodule/mongodb/mongos_callback.go new file mode 100644 index 0000000000..ff3cbebc89 --- /dev/null +++ b/dbm-services/common/dbha/ha-module/dbmodule/mongodb/mongos_callback.go @@ -0,0 +1,156 @@ +package mongodb + +import ( + "dbm-services/common/dbha/ha-module/client" + "dbm-services/common/dbha/ha-module/config" + "dbm-services/common/dbha/ha-module/constvar" + "dbm-services/common/dbha/ha-module/dbutil" + "dbm-services/common/dbha/ha-module/log" + "encoding/json" + "fmt" + "strconv" +) + +// MongosInstanceInfoDetail 实例信息 +type MongosInstanceInfoDetail struct { + IP string `json:"ip"` + Port int `json:"port"` + BKIdcCityID int `json:"bk_idc_city_id"` + InstanceRole string `json:"instance_role"` + Status string `json:"status"` + Cluster string `json:"cluster"` + BKBizID int `json:"bk_biz_id"` + ClusterType string `json:"cluster_type"` + MachineType string `json:"machine_type"` +} + +// NewMongosInstanceByCmDB unmarshal cmdb instances to agent detect instance struct +func NewMongosInstanceByCmDB(instances []interface{}, Conf *config.Config) ([]dbutil.DataBaseDetect, error) { + var ( + err error + unmarshalIns []*MongosDetectInstanceInfoFromCmDB + ret []dbutil.DataBaseDetect + ) + + if unmarshalIns, err = UnMarshalMongosInstanceByCmdb(instances, + constvar.MongoShardCluster, constvar.Mongos); err != nil { + return nil, err + } + + // cmdb的数据结构转换为agent用来探测的数据结构 + for _, uIns := range unmarshalIns { + ret = append(ret, NewMongosDetectInstanceForAgent(uIns, Conf)) + } + + return ret, err +} + +// DeserializeMongos 反序列化从Agent上报上来的故障实例 +func DeserializeMongos(jsonInfo []byte, conf *config.Config) (dbutil.DataBaseDetect, error) { + response := MongosDetectResponse{} + err := json.Unmarshal(jsonInfo, &response) + if err != nil { + log.Logger.Errorf("json unmarshal failed. jsoninfo:\n%s\n, err:%s", string(jsonInfo), err.Error()) + return nil, err + } + var ret dbutil.DataBaseDetect + // gm将agent上报的数据结构转换为gdm通道接收的数据结构 + ret = NewMongosDetectInstanceForGdm(&response, constvar.MongoShardCluster, conf) + return ret, nil +} + +// NewMongosSwitchInstance unmarshal cmdb instances to switch instance struct +func NewMongosSwitchInstance(instances []interface{}, conf *config.Config) ([]dbutil.DataBaseSwitch, error) { + var ret []dbutil.DataBaseSwitch + for _, v := range instances { + ins := dbutil.DBInstanceInfoDetail{} + rawData, err := json.Marshal(v) + if err != nil { + return nil, fmt.Errorf("marshal instance info failed:%s", err.Error()) + } + if err = json.Unmarshal(rawData, &ins); err != nil { + return nil, fmt.Errorf("unmarshal instance info failed:%s", err.Error()) + } + + // 用于切换的实例信息 + swIns := MongosSwitch{ + BaseSwitch: dbutil.BaseSwitch{ + Ip: ins.IP, + Port: ins.Port, + IdcID: ins.BKIdcCityID, + Status: ins.Status, + App: strconv.Itoa(ins.BKBizID), + ClusterType: ins.ClusterType, + MetaType: ins.MachineType, + Cluster: ins.Cluster, + Config: conf, + CmDBClient: client.NewCmDBClient(&conf.DBConf.CMDB, conf.GetCloudId()), + HaDBClient: client.NewHaDBClient(&conf.DBConf.HADB, conf.GetCloudId()), + }, + Role: ins.MachineType, + } + + // only need mongos instance ; ignore mongdb and mongo_config + if ins.MachineType == constvar.Mongos { + // DNS + if ins.BindEntry.Dns == nil { + swIns.ApiGw.DNSFlag = false + } else { + swIns.ApiGw.DNSFlag = true + swIns.ApiGw.ServiceEntry.Dns = ins.BindEntry.Dns + } + + // CLB + if ins.BindEntry.Clb != nil && len(ins.BindEntry.Clb) > 0 { + swIns.ApiGw.CLBFlag = true + swIns.ApiGw.ServiceEntry.Clb = ins.BindEntry.Clb + } else { + swIns.ApiGw.CLBFlag = false + } + ret = append(ret, &swIns) + } + } + return ret, nil +} + +// UnMarshalMongosInstanceByCmdb convert cmdb instance info to MongosDetectInstanceInfoFromCmDB +func UnMarshalMongosInstanceByCmdb(instances []interface{}, + uClusterType string, uMetaType string) ([]*MongosDetectInstanceInfoFromCmDB, error) { + var ( + ret []*MongosDetectInstanceInfoFromCmDB + ) + cache := map[string]*MongosDetectInstanceInfoFromCmDB{} + + for _, v := range instances { + ins := MongosInstanceInfoDetail{} + rawData, err := json.Marshal(v) + if err != nil { + return nil, fmt.Errorf("marshal instance info failed:%s", err.Error()) + } + if err = json.Unmarshal(rawData, &ins); err != nil { + return nil, fmt.Errorf("unmarshal instance info failed:%s", err.Error()) + } + if ins.ClusterType != uClusterType || ins.MachineType != uMetaType || + (ins.Status != constvar.RUNNING && ins.Status != constvar.AVAILABLE) { + continue + } + cacheIns, ok := cache[ins.IP] + //only need detect the minimum port instance + if !ok || ok && ins.Port < cacheIns.Port { + cache[ins.IP] = &MongosDetectInstanceInfoFromCmDB{ + Ip: ins.IP, + Port: ins.Port, + App: strconv.Itoa(ins.BKBizID), + ClusterType: ins.ClusterType, + MetaType: ins.MachineType, + Cluster: ins.Cluster, + } + } + } + + for _, cacheIns := range cache { + ret = append(ret, cacheIns) + } + + return ret, nil +} diff --git a/dbm-services/common/dbha/ha-module/dbmodule/mongodb/mongos_detect.go b/dbm-services/common/dbha/ha-module/dbmodule/mongodb/mongos_detect.go new file mode 100644 index 0000000000..97b676774e --- /dev/null +++ b/dbm-services/common/dbha/ha-module/dbmodule/mongodb/mongos_detect.go @@ -0,0 +1,183 @@ +package mongodb + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "regexp" + "time" + + "dbm-services/common/dbha/ha-module/config" + "dbm-services/common/dbha/ha-module/constvar" + "dbm-services/common/dbha/ha-module/dbutil" + "dbm-services/common/dbha/ha-module/log" + "dbm-services/common/dbha/ha-module/types" + "dbm-services/common/dbha/ha-module/util" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +// MongosDetectInstance mongos instance detect struct +type MongosDetectInstance struct { + dbutil.BaseDetectDB + User string + Pass string + Timeout int +} + +var mongoRegex *regexp.Regexp + +func init() { + // like 2.4.12 3.0.5 6.0.6 + mongoRegex = regexp.MustCompile(`^[0-9]+.[0-9]+.[0-9]+`) +} + +// MongosDetectResponse mongos instance response struct +type MongosDetectResponse struct { + dbutil.BaseDetectDBResponse +} + +// MongosDetectInstanceInfoFromCmDB mongos instance detect struct in cmdb +type MongosDetectInstanceInfoFromCmDB struct { + Ip string + Port int + App string + ClusterType string + MetaType string + Cluster string +} + +// NewMongosDetectInstanceForAgent convert cmdb info to detect info +func NewMongosDetectInstanceForAgent(ins *MongosDetectInstanceInfoFromCmDB, + conf *config.Config) *MongosDetectInstance { + return &MongosDetectInstance{ + BaseDetectDB: dbutil.BaseDetectDB{ + Ip: ins.Ip, + Port: ins.Port, + App: ins.App, + DBType: types.DBType(ins.ClusterType), + ReporterTime: time.Unix(0, 0), + ReportInterval: conf.AgentConf.ReportInterval + rand.Intn(20), + Status: constvar.DBCheckSuccess, + Cluster: ins.Cluster, + }, + Timeout: conf.DBConf.MongoDB.Timeout, + } +} + +// NewMongosDetectInstanceForGdm convert api response info into detect info +func NewMongosDetectInstanceForGdm(ins *MongosDetectResponse, + dbType string, conf *config.Config) *MongosDetectInstance { + return &MongosDetectInstance{ + BaseDetectDB: dbutil.BaseDetectDB{ + Ip: ins.DBIp, + Port: ins.DBPort, + App: ins.App, + DBType: types.DBType(dbType), + ReporterTime: time.Unix(0, 0), + ReportInterval: conf.AgentConf.ReportInterval + rand.Intn(20), + Status: types.CheckStatus(ins.Status), + Cluster: ins.Cluster, + }, + Timeout: conf.DBConf.MongoDB.Timeout, + } +} + +// Detection mongo 存活探测 PING (5s超时) +func (m *MongosDetectInstance) Detection() (err error) { + if err = m.CheckMongo(); err != nil { + m.Status = constvar.DBCheckFailed + log.Logger.Debugf("mongos check instance failed . %s#%d:%+v", m.Ip, m.Port, err) + + if sshErr := m.CheckSSH(); sshErr != nil { + if util.CheckSSHErrIsAuthFail(sshErr) { + m.Status = constvar.SSHAuthFailed + log.Logger.Errorf("mongos check ssh auth failed.ip:%s,port:%d,app:%s,status:%s", + m.Ip, m.Port, m.App, m.Status) + } else { + m.Status = constvar.SSHCheckFailed + log.Logger.Errorf("mongos check ssh failed.ip:%s,port:%d,app:%s,status:%s", + m.Ip, m.Port, m.App, m.Status) + } + return sshErr + } + + log.Logger.Debugf("mongos check ssh success. ip:%s, port:%d, app:%s", m.Ip, m.Port, m.App) + m.Status = constvar.SSHCheckSuccess + return nil + } + + m.Status = constvar.DBCheckSuccess + log.Logger.Infof("mongos check instance success . %s#%d", m.Ip, m.Port) + return nil +} + +// MongosDetectInstance check whether mongo alive +func (m *MongosDetectInstance) CheckMongo() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(m.Timeout)*time.Second) + defer cancel() + // uri := "mongodb://user:****@localhost:27017" + uri := fmt.Sprintf("mongodb://%s:%d", m.Ip, m.Port) + client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri)) + if err != nil { + log.Logger.Warnf("connect mongo failed. %s#%d, err:%s", m.Ip, m.Port, err.Error()) + return err + } + // release connection. + defer func() { + if err = client.Disconnect(ctx); err != nil { + log.Logger.Warnf("close mongo connection failed. %s#%d, err:%s", m.Ip, m.Port, err.Error()) + } + }() + + // if err = client.Ping(ctx, readpref.Primary()); err != nil { + // log.Logger.Errorf("ping mongo [ %s ] failed. ip:%s, port:%d, err:%s", m.Ip, m.Port, err.Error()) + // return err + // } + var buildInfoDoc bson.M + if err := client.Database("dbha").RunCommand(ctx, + bson.D{bson.E{Key: "buildInfo", Value: 1}}).Decode(&buildInfoDoc); err != nil { + log.Logger.Errorf("failed to run buildInfo command: %s#%d, err:%+v", m.Ip, m.Port, err) + return err + } + + mong_version, _ := buildInfoDoc["version"].(string) + if !mongoRegex.MatchString(mong_version) { + return fmt.Errorf("mongos check failed:%s", mong_version) + } + + return nil +} + +// Serialization serialize mongos instance info +func (m *MongosDetectInstance) Serialization() ([]byte, error) { + response := MongosDetectResponse{ + BaseDetectDBResponse: m.NewDBResponse(), + } + + resByte, err := json.Marshal(&response) + + if err != nil { + log.Logger.Errorf("mongo serialization failed. err:%s", err.Error()) + return []byte{}, err + } + + return resByte, nil +} + +// CheckSSH mongo do ssh check +func (m *MongosDetectInstance) CheckSSH() error { + touchFile := fmt.Sprintf("%s_%s_%d", m.SshInfo.Dest, "agent", m.Port) + + touchStr := fmt.Sprintf("touch %s && if [ -d \"/data1/dbha\" ]; then touch /data1/dbha/%s ; fi "+ + "&& if [ -d \"/data/dbha\" ]; then touch /data/dbha/%s ; fi", touchFile, touchFile, touchFile) + + if err := m.DoSSH(touchStr); err != nil { + log.Logger.Errorf("MongoDetection do ssh failed. err:%s", err.Error()) + return err + } + return nil +} diff --git a/dbm-services/common/dbha/ha-module/dbmodule/mongodb/mongos_switch.go b/dbm-services/common/dbha/ha-module/dbmodule/mongodb/mongos_switch.go new file mode 100644 index 0000000000..0dc710d4a6 --- /dev/null +++ b/dbm-services/common/dbha/ha-module/dbmodule/mongodb/mongos_switch.go @@ -0,0 +1,125 @@ +package mongodb + +import ( + "dbm-services/common/dbha/ha-module/constvar" + "dbm-services/common/dbha/ha-module/dbutil" + "dbm-services/common/dbha/ha-module/log" + "fmt" +) + +type GwInfo struct { + CLBFlag bool + DNSFlag bool + ServiceEntry dbutil.BindEntry +} + +// MongosSwitch defined mongo switch struct +type MongosSwitch struct { + dbutil.BaseSwitch + ApiGw GwInfo + Role string +} + +// # switch operation +// # step 1, check if mongos can switch +// # step 2, mark the current inst as can switch in sw_queue +// # step 3, mark the current inst to in_switch status in tb_mon_switch_queue +// # step 5, delete the instance from that dns, print the instances number before/after switch +// # step 6, update the dns_param table to make the dns change take affect +// # step 7, return + +// GetRole get mysql role type +func (ins *MongosSwitch) GetRole() string { + return ins.Role +} + +// ShowSwitchInstanceInfo show mongo instance's switch info +func (ins *MongosSwitch) ShowSwitchInstanceInfo() string { + str := fmt.Sprintf("<%s#%d IDC:%d Role:%s Status:%s Bzid:%s ClusterType:%s MachineType:%s>", + ins.Ip, ins.Port, ins.IdcID, ins.Role, ins.Status, ins.App, ins.ClusterType, + ins.MetaType) + return str +} + +// CheckSwitch check before switch [Step 1] +func (ins *MongosSwitch) CheckSwitch() (bool, error) { + if ins.Role != constvar.Mongos { + err := fmt.Errorf("info:{%s} unknown role", ins.ShowSwitchInstanceInfo()) + ins.ReportLogs(constvar.FailResult, err.Error()) + return false, err + } + + // check dns keep at least One. + var err error + if ins.ApiGw.DNSFlag && len(ins.ApiGw.ServiceEntry.Dns) >= 1 { + for idx, bindInfo := range ins.ApiGw.ServiceEntry.Dns { + ins.ReportLogs(constvar.InfoResult, + fmt.Sprintf("check dns bind items %s had bindIPs:%d", bindInfo.DomainName, len(bindInfo.BindIps))) + if len(bindInfo.BindIps) <= 1 { + err = fmt.Errorf("%s|[%d:%s]:bind no more than one IPs:%+v", + err.Error(), idx, bindInfo.DomainName, bindInfo.BindIps) + ins.ReportLogs(constvar.FailResult, err.Error()) + } + } + } + if err != nil { + return false, err + } + + return true, nil +} + +// DoSwitch do switch [Step 2] +func (ins *MongosSwitch) DoSwitch() error { + ins.ReportLogs(constvar.InfoResult, fmt.Sprintf("handle mongos switch[%s:%d]", ins.Ip, ins.Port)) + err := ins.KickOffDns() + cErr := ins.KickOffClb() + + if err != nil { + err := fmt.Errorf("mongos kickoff DNS failed,[%s] err:%s", ins.ShowSwitchInstanceInfo(), err.Error()) + ins.ReportLogs(constvar.FailResult, err.Error()) + return err + } + + if cErr != nil { + err := fmt.Errorf("mongos kickoff CLB failed,[%s] err:%s", ins.ShowSwitchInstanceInfo(), err.Error()) + ins.ReportLogs(constvar.FailResult, err.Error()) + return cErr + } + + succLog := fmt.Sprintf("mongos do switch success, HasDNS[%t] HasCLB[%t]", ins.ApiGw.DNSFlag, ins.ApiGw.CLBFlag) + ins.ReportLogs(constvar.InfoResult, succLog) + return nil +} + +// UpdateMetaInfo swap master, slave 's meta info in cmdb [Step 3] +func (ins *MongosSwitch) UpdateMetaInfo() error { + return nil +} + +// RollBack do switch rollback +func (ins *MongosSwitch) RollBack() error { + return nil +} + +// KickOffDns kick instance from dns +func (ins *MongosSwitch) KickOffDns() error { + if !ins.ApiGw.DNSFlag { + log.Logger.Infof("no need kickoff DNS,info:%s", ins.ShowSwitchInstanceInfo()) + return nil + } + + // kick off instance from dns + return ins.DeleteNameService(dbutil.BindEntry{Dns: ins.ApiGw.ServiceEntry.Dns}) +} + +// KickOffClb TODO +func (ins *MongosSwitch) KickOffClb() error { + if !ins.ApiGw.CLBFlag { + log.Logger.Infof("no need kickoff CLB,info:%s", ins.ShowSwitchInstanceInfo()) + return nil + } + + // kick off instance from clb + return ins.DeleteNameService(dbutil.BindEntry{Clb: ins.ApiGw.ServiceEntry.Clb}) +} diff --git a/dbm-services/common/dbha/ha-module/dbmodule/register.go b/dbm-services/common/dbha/ha-module/dbmodule/register.go index 09cd0e7175..408da1efb9 100644 --- a/dbm-services/common/dbha/ha-module/dbmodule/register.go +++ b/dbm-services/common/dbha/ha-module/dbmodule/register.go @@ -4,6 +4,7 @@ import ( "dbm-services/common/dbha/ha-module/config" "dbm-services/common/dbha/ha-module/constvar" "dbm-services/common/dbha/ha-module/dbmodule/dbmysql" + "dbm-services/common/dbha/ha-module/dbmodule/mongodb" "dbm-services/common/dbha/ha-module/dbmodule/redis" "dbm-services/common/dbha/ha-module/dbmodule/riak" "dbm-services/common/dbha/ha-module/dbmodule/sqlserver" @@ -96,4 +97,11 @@ func init() { DeserializeCallback: sqlserver.DeserializeSqlserver, GetSwitchInstanceInformation: sqlserver.NewSqlserverSwitchInstance, } + + // Mongos used + DBCallbackMap[constvar.MongoShardCluster] = Callback{ + FetchDBCallback: mongodb.NewMongosInstanceByCmDB, + DeserializeCallback: mongodb.DeserializeMongos, + GetSwitchInstanceInformation: mongodb.NewMongosSwitchInstance, + } }