Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mongodb): mongos DBHA support #6419 #6529

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions dbm-services/common/dbha/ha-module/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ type DBConfig struct {
Riak RiakConfig `yaml:"riak"`
// Sqlserver instance detect info
Sqlserver SqlserverConfig `yaml:"sqlserver"`
// MongoDB mongo config.
MongoDB MongoConfig `yaml:"mongodb"`
}

// MySQLConfig mysql instance connect info
Expand Down Expand Up @@ -156,6 +158,11 @@ type SqlserverConfig struct {
Timeout int `yaml:"timeout"`
}

// MongoConfig mongo
type MongoConfig struct {
Timeout int `yaml:"timeout"`
}

// SSHConfig ssh detect configure
type SSHConfig struct {
Port int `yaml:"port"`
Expand Down
6 changes: 6 additions & 0 deletions dbm-services/common/dbha/ha-module/constvar/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ const (
TendisplusMetaType = "tendisplus"
// SqlserverMetatype storage layer type name in SqlserverHa
SqlserverMetatype = "sqlserver_ha"

// Mongos MONGOS = EnumField("mongos", _("mongos")) # mongos
Mongos = "mongos"
)

// instance role in cmdb
Expand Down Expand Up @@ -108,6 +111,9 @@ const (
Riak = "riak"
// SqlserverHA TODO
SqlserverHA = "sqlserver_ha"

// MongoShardedCluster = EnumField("MongoShardedCluster", _("Mongo分片集群"))
MongoShardCluster = "MongoShardedCluster"
)

// wrapper name in TenDBCluster
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package mongodb

import (
"dbm-services/common/dbha/ha-module/client"
"dbm-services/common/dbha/ha-module/config"
"dbm-services/common/dbha/ha-module/constvar"
"dbm-services/common/dbha/ha-module/dbutil"
"dbm-services/common/dbha/ha-module/log"
"encoding/json"
"fmt"
"strconv"
)

// MongosInstanceInfoDetail 实例信息
type MongosInstanceInfoDetail struct {
IP string `json:"ip"`
Port int `json:"port"`
BKIdcCityID int `json:"bk_idc_city_id"`
InstanceRole string `json:"instance_role"`
Status string `json:"status"`
Cluster string `json:"cluster"`
BKBizID int `json:"bk_biz_id"`
ClusterType string `json:"cluster_type"`
MachineType string `json:"machine_type"`
}

// NewMongosInstanceByCmDB unmarshal cmdb instances to agent detect instance struct
func NewMongosInstanceByCmDB(instances []interface{}, Conf *config.Config) ([]dbutil.DataBaseDetect, error) {
var (
err error
unmarshalIns []*MongosDetectInstanceInfoFromCmDB
ret []dbutil.DataBaseDetect
)

if unmarshalIns, err = UnMarshalMongosInstanceByCmdb(instances,
constvar.MongoShardCluster, constvar.Mongos); err != nil {
return nil, err
}

// cmdb的数据结构转换为agent用来探测的数据结构
for _, uIns := range unmarshalIns {
ret = append(ret, NewMongosDetectInstanceForAgent(uIns, Conf))
}

return ret, err
}

// DeserializeMongos 反序列化从Agent上报上来的故障实例
func DeserializeMongos(jsonInfo []byte, conf *config.Config) (dbutil.DataBaseDetect, error) {
response := MongosDetectResponse{}
err := json.Unmarshal(jsonInfo, &response)
if err != nil {
log.Logger.Errorf("json unmarshal failed. jsoninfo:\n%s\n, err:%s", string(jsonInfo), err.Error())
return nil, err
}
var ret dbutil.DataBaseDetect
// gm将agent上报的数据结构转换为gdm通道接收的数据结构
ret = NewMongosDetectInstanceForGdm(&response, constvar.MongoShardCluster, conf)
return ret, nil
}

// NewMongosSwitchInstance unmarshal cmdb instances to switch instance struct
func NewMongosSwitchInstance(instances []interface{}, conf *config.Config) ([]dbutil.DataBaseSwitch, error) {
var ret []dbutil.DataBaseSwitch
for _, v := range instances {
ins := dbutil.DBInstanceInfoDetail{}
rawData, err := json.Marshal(v)
if err != nil {
return nil, fmt.Errorf("marshal instance info failed:%s", err.Error())
}
if err = json.Unmarshal(rawData, &ins); err != nil {
return nil, fmt.Errorf("unmarshal instance info failed:%s", err.Error())
}

// 用于切换的实例信息
swIns := MongosSwitch{
BaseSwitch: dbutil.BaseSwitch{
Ip: ins.IP,
Port: ins.Port,
IdcID: ins.BKIdcCityID,
Status: ins.Status,
App: strconv.Itoa(ins.BKBizID),
ClusterType: ins.ClusterType,
MetaType: ins.MachineType,
Cluster: ins.Cluster,
Config: conf,
CmDBClient: client.NewCmDBClient(&conf.DBConf.CMDB, conf.GetCloudId()),
HaDBClient: client.NewHaDBClient(&conf.DBConf.HADB, conf.GetCloudId()),
},
Role: ins.MachineType,
}

// only need mongos instance ; ignore mongdb and mongo_config
if ins.MachineType == constvar.Mongos {
// DNS
if ins.BindEntry.Dns == nil {
swIns.ApiGw.DNSFlag = false
} else {
swIns.ApiGw.DNSFlag = true
swIns.ApiGw.ServiceEntry.Dns = ins.BindEntry.Dns
}

// CLB
if ins.BindEntry.Clb != nil && len(ins.BindEntry.Clb) > 0 {
swIns.ApiGw.CLBFlag = true
swIns.ApiGw.ServiceEntry.Clb = ins.BindEntry.Clb
} else {
swIns.ApiGw.CLBFlag = false
}
ret = append(ret, &swIns)
}
}
return ret, nil
}

// UnMarshalMongosInstanceByCmdb convert cmdb instance info to MongosDetectInstanceInfoFromCmDB
func UnMarshalMongosInstanceByCmdb(instances []interface{},
uClusterType string, uMetaType string) ([]*MongosDetectInstanceInfoFromCmDB, error) {
var (
ret []*MongosDetectInstanceInfoFromCmDB
)
cache := map[string]*MongosDetectInstanceInfoFromCmDB{}

for _, v := range instances {
ins := MongosInstanceInfoDetail{}
rawData, err := json.Marshal(v)
if err != nil {
return nil, fmt.Errorf("marshal instance info failed:%s", err.Error())
}
if err = json.Unmarshal(rawData, &ins); err != nil {
return nil, fmt.Errorf("unmarshal instance info failed:%s", err.Error())
}
if ins.ClusterType != uClusterType || ins.MachineType != uMetaType ||
(ins.Status != constvar.RUNNING && ins.Status != constvar.AVAILABLE) {
continue
}
cacheIns, ok := cache[ins.IP]
//only need detect the minimum port instance
if !ok || ok && ins.Port < cacheIns.Port {
cache[ins.IP] = &MongosDetectInstanceInfoFromCmDB{
Ip: ins.IP,
Port: ins.Port,
App: strconv.Itoa(ins.BKBizID),
ClusterType: ins.ClusterType,
MetaType: ins.MachineType,
Cluster: ins.Cluster,
}
}
}

for _, cacheIns := range cache {
ret = append(ret, cacheIns)
}

return ret, nil
}
183 changes: 183 additions & 0 deletions dbm-services/common/dbha/ha-module/dbmodule/mongodb/mongos_detect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package mongodb

import (
"context"
"encoding/json"
"fmt"
"math/rand"
"regexp"
"time"

"dbm-services/common/dbha/ha-module/config"
"dbm-services/common/dbha/ha-module/constvar"
"dbm-services/common/dbha/ha-module/dbutil"
"dbm-services/common/dbha/ha-module/log"
"dbm-services/common/dbha/ha-module/types"
"dbm-services/common/dbha/ha-module/util"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

// MongosDetectInstance mongos instance detect struct
type MongosDetectInstance struct {
dbutil.BaseDetectDB
User string
Pass string
Timeout int
}

var mongoRegex *regexp.Regexp

func init() {
// like 2.4.12 3.0.5 6.0.6
mongoRegex = regexp.MustCompile(`^[0-9]+.[0-9]+.[0-9]+`)
}

// MongosDetectResponse mongos instance response struct
type MongosDetectResponse struct {
dbutil.BaseDetectDBResponse
}

// MongosDetectInstanceInfoFromCmDB mongos instance detect struct in cmdb
type MongosDetectInstanceInfoFromCmDB struct {
Ip string
Port int
App string
ClusterType string
MetaType string
Cluster string
}

// NewMongosDetectInstanceForAgent convert cmdb info to detect info
func NewMongosDetectInstanceForAgent(ins *MongosDetectInstanceInfoFromCmDB,
conf *config.Config) *MongosDetectInstance {
return &MongosDetectInstance{
BaseDetectDB: dbutil.BaseDetectDB{
Ip: ins.Ip,
Port: ins.Port,
App: ins.App,
DBType: types.DBType(ins.ClusterType),
ReporterTime: time.Unix(0, 0),
ReportInterval: conf.AgentConf.ReportInterval + rand.Intn(20),
Status: constvar.DBCheckSuccess,
Cluster: ins.Cluster,
},
Timeout: conf.DBConf.MongoDB.Timeout,
}
}

// NewMongosDetectInstanceForGdm convert api response info into detect info
func NewMongosDetectInstanceForGdm(ins *MongosDetectResponse,
dbType string, conf *config.Config) *MongosDetectInstance {
return &MongosDetectInstance{
BaseDetectDB: dbutil.BaseDetectDB{
Ip: ins.DBIp,
Port: ins.DBPort,
App: ins.App,
DBType: types.DBType(dbType),
ReporterTime: time.Unix(0, 0),
ReportInterval: conf.AgentConf.ReportInterval + rand.Intn(20),
Status: types.CheckStatus(ins.Status),
Cluster: ins.Cluster,
},
Timeout: conf.DBConf.MongoDB.Timeout,
}
}

// Detection mongo 存活探测 PING (5s超时)
func (m *MongosDetectInstance) Detection() (err error) {
if err = m.CheckMongo(); err != nil {
m.Status = constvar.DBCheckFailed
log.Logger.Debugf("mongos check instance failed . %s#%d:%+v", m.Ip, m.Port, err)

if sshErr := m.CheckSSH(); sshErr != nil {
if util.CheckSSHErrIsAuthFail(sshErr) {
m.Status = constvar.SSHAuthFailed
log.Logger.Errorf("mongos check ssh auth failed.ip:%s,port:%d,app:%s,status:%s",
m.Ip, m.Port, m.App, m.Status)
} else {
m.Status = constvar.SSHCheckFailed
log.Logger.Errorf("mongos check ssh failed.ip:%s,port:%d,app:%s,status:%s",
m.Ip, m.Port, m.App, m.Status)
}
return sshErr
}

log.Logger.Debugf("mongos check ssh success. ip:%s, port:%d, app:%s", m.Ip, m.Port, m.App)
m.Status = constvar.SSHCheckSuccess
return nil
}

m.Status = constvar.DBCheckSuccess
log.Logger.Infof("mongos check instance success . %s#%d", m.Ip, m.Port)
return nil
}

// MongosDetectInstance check whether mongo alive
func (m *MongosDetectInstance) CheckMongo() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(m.Timeout)*time.Second)
defer cancel()
// uri := "mongodb://user:****@localhost:27017"
uri := fmt.Sprintf("mongodb://%s:%d", m.Ip, m.Port)
client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri))
if err != nil {
log.Logger.Warnf("connect mongo failed. %s#%d, err:%s", m.Ip, m.Port, err.Error())
return err
}
// release connection.
defer func() {
if err = client.Disconnect(ctx); err != nil {
log.Logger.Warnf("close mongo connection failed. %s#%d, err:%s", m.Ip, m.Port, err.Error())
}
}()

// if err = client.Ping(ctx, readpref.Primary()); err != nil {
// log.Logger.Errorf("ping mongo [ %s ] failed. ip:%s, port:%d, err:%s", m.Ip, m.Port, err.Error())
// return err
// }
var buildInfoDoc bson.M
if err := client.Database("dbha").RunCommand(ctx,
bson.D{bson.E{Key: "buildInfo", Value: 1}}).Decode(&buildInfoDoc); err != nil {
log.Logger.Errorf("failed to run buildInfo command: %s#%d, err:%+v", m.Ip, m.Port, err)
return err
}

mong_version, _ := buildInfoDoc["version"].(string)
if !mongoRegex.MatchString(mong_version) {
return fmt.Errorf("mongos check failed:%s", mong_version)
}

return nil
}

// Serialization serialize mongos instance info
func (m *MongosDetectInstance) Serialization() ([]byte, error) {
response := MongosDetectResponse{
BaseDetectDBResponse: m.NewDBResponse(),
}

resByte, err := json.Marshal(&response)

if err != nil {
log.Logger.Errorf("mongo serialization failed. err:%s", err.Error())
return []byte{}, err
}

return resByte, nil
}

// CheckSSH mongo do ssh check
func (m *MongosDetectInstance) CheckSSH() error {
touchFile := fmt.Sprintf("%s_%s_%d", m.SshInfo.Dest, "agent", m.Port)

touchStr := fmt.Sprintf("touch %s && if [ -d \"/data1/dbha\" ]; then touch /data1/dbha/%s ; fi "+
"&& if [ -d \"/data/dbha\" ]; then touch /data/dbha/%s ; fi", touchFile, touchFile, touchFile)

if err := m.DoSSH(touchStr); err != nil {
log.Logger.Errorf("MongoDetection do ssh failed. err:%s", err.Error())
return err
}
return nil
}
Loading
Loading