Skip to content

Commit

Permalink
fix(dbm-services): DBHA multi-GM may cause repeated switch close Tenc…
Browse files Browse the repository at this point in the history
  • Loading branch information
xjxia authored and hLinx committed Dec 9, 2024
1 parent bb5b02f commit 17f65cf
Show file tree
Hide file tree
Showing 11 changed files with 167 additions and 102 deletions.
137 changes: 94 additions & 43 deletions dbm-services/common/dbha/ha-module/agent/monitor_agent.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package agent

import (
"dbm-services/common/dbha/hadb-api/model"
"fmt"
"hash/crc32"
"net"
"sort"
"strconv"
Expand All @@ -16,8 +16,17 @@ import (
"dbm-services/common/dbha/ha-module/dbutil"
"dbm-services/common/dbha/ha-module/log"
"dbm-services/common/dbha/ha-module/monitor"
"dbm-services/common/dbha/hadb-api/model"
)

// CachedHostInfo instance had report to gm
type CachedHostInfo struct {
Ip string
ReporterGMTime time.Time
//interval seconds to expire
ExpireInterval int
}

// MonitorAgent agent work struct
type MonitorAgent struct {
CityID int
Expand All @@ -31,9 +40,12 @@ type MonitorAgent struct {
// mod for current agent
HashMod int
// mod value for current agent
HashValue int
HashValue int
//instances need to detect
DBInstance map[string]dbutil.DataBaseDetect
GMInstance map[string]*GMConnection
//cache for reported GM instances
ReportGMCache map[string]*CachedHostInfo
GMInstance map[string]*GMConnection
// config file
Conf *config.Config
// API client to access cmdb metadata
Expand All @@ -42,6 +54,7 @@ type MonitorAgent struct {
HaDBClient *client.HaDBClient
heartbeat time.Time
MaxConcurrency int // Add this field to store the max concurrency value
Mutex sync.RWMutex
}

// NewMonitorAgent new a new agent do detect
Expand All @@ -54,6 +67,7 @@ func NewMonitorAgent(conf *config.Config, detectType string) (*MonitorAgent, err
LastFetchInsTime: time.Now(),
LastFetchGMTime: time.Now(),
GMInstance: map[string]*GMConnection{},
ReportGMCache: map[string]*CachedHostInfo{},
heartbeat: time.Now(),
Conf: conf,
CmDBClient: client.NewCmDBClient(&conf.DBConf.CMDB, conf.GetCloudId()),
Expand Down Expand Up @@ -122,6 +136,11 @@ func (a *MonitorAgent) RefreshInstanceCache() {
return
}
a.flushInsFetchTime()

// delete reported gm cache
for k, _ := range a.ReportGMCache {
delete(a.ReportGMCache, k)
}
}
}

Expand All @@ -136,14 +155,24 @@ func (a *MonitorAgent) DoDetectSingle(ins dbutil.DataBaseDetect) {
}

a.reportMonitor(ins, err)
if ins.NeedReporter() {
// reporter detect result to hadb
if err = a.ReporterDetectInfo(ins); err != nil {
if ins.NeedReportAgent() {
//only report to HADB's ha_agent_logs
if err = a.HaDBClient.ReportDBStatus(ins.GetApp(), a.MonIp, ip, port,
string(ins.GetDBType()), string(ins.GetStatus()), "N/A"); err != nil {
log.Logger.Errorf(
"reporter hadb instance status failed. err:%s, ip:%s, port:%d, db_type:%s, status:%s",
err.Error(), ip, port, ins.GetDBType(), ins.GetStatus())
}
ins.UpdateReporterTime()
ins.UpdateReportTime()
}

if a.NeedReportGM(ins) {
// reporter detect result to GM
if err = a.ReportDetectInfoToGM(ins); err != nil {
log.Logger.Errorf(
"reporter instance info to gm failed. err:%s, ip:%s, port:%d, db_type:%s, status:%s",
err.Error(), ip, port, ins.GetDBType(), ins.GetStatus())
}
}
}

Expand All @@ -165,16 +194,16 @@ func (a *MonitorAgent) RefreshGMCache() {
a.flushGmFetchTime()
}

a.Mutex.Lock()
defer a.Mutex.Unlock()
for ip, ins := range a.GMInstance {
ins.Mutex.Lock()
anHour := time.Now().Add(-60 * time.Minute)
anHour := time.Now().Add(-30 * time.Minute)
// connect leak?
if ins.LastFetchTime.Before(anHour) {
ins.IsClose = true
log.Logger.Infof("gm:%s de-cached", ip)
delete(a.GMInstance, ip)
}
ins.Mutex.Unlock()
}

// we not return error here, next refresh, new added gm maybe available.
Expand Down Expand Up @@ -261,6 +290,9 @@ func (a *MonitorAgent) FetchGMInstance() error {
return err
}

a.Mutex.Lock()
defer a.Mutex.Unlock()

for _, info := range gmInfo {
if info.CityID == a.CityID || info.CloudID != a.Conf.AgentConf.CloudID {
continue
Expand Down Expand Up @@ -290,32 +322,48 @@ func (a *MonitorAgent) FetchGMInstance() error {
return nil
}

// ReporterDetectInfo report detect info to gm
func (a *MonitorAgent) ReporterDetectInfo(reporterInstance dbutil.DataBaseDetect) error {
var err error
isReporter := false
ip, port := reporterInstance.GetAddress()
if reporterInstance.GetStatus() == constvar.DBCheckSuccess ||
reporterInstance.GetStatus() == constvar.SSHCheckSuccess {
if err = a.HaDBClient.ReportDBStatus(reporterInstance.GetApp(), a.MonIp, ip, port,
string(reporterInstance.GetDBType()), string(reporterInstance.GetStatus()), "N/A"); err != nil {
log.Logger.Errorf(
"reporter hadb instance status failed. err:%s, ip:%s, port:%d, db_type:%s, status:%s",
err.Error(), ip, port, reporterInstance.GetDBType(), reporterInstance.GetStatus())
// NeedReportGM report detect info to GM module
func (a *MonitorAgent) NeedReportGM(ins dbutil.DataBaseDetect) bool {
ip, _ := ins.GetAddress()
if ins.GetStatus() == constvar.DBCheckSuccess ||
ins.GetStatus() == constvar.SSHCheckSuccess {
return false
}

if _, ok := a.ReportGMCache[ip]; ok {
cachedIns := a.ReportGMCache[ip]
now := time.Now()
if now.Before(cachedIns.ReporterGMTime.Add(time.Second * time.Duration(cachedIns.ExpireInterval))) {
return false
}
return nil
}

return true
}

// ReportDetectInfoToGM report detect info to HADB, maybe gm also
func (a *MonitorAgent) ReportDetectInfoToGM(reporterInstance dbutil.DataBaseDetect) error {
var err error
isReported := false
ip, port := reporterInstance.GetAddress()

// 提取 GMInstance 的 IP 列表
var gmIPs []string
a.Mutex.RLock()
for gmIP := range a.GMInstance {
gmIPs = append(gmIPs, gmIP)
}
a.Mutex.RUnlock()

// 按照 IP 字典顺序排序
sort.Strings(gmIPs)
hashValue := crc32.ChecksumIEEE([]byte(ip))
hashIndex := int(hashValue) % len(gmIPs)

for _, sortedIp := range gmIPs {
for i := 0; i < len(gmIPs); i++ {
//每次遍历时,都是从下一个偏移开始,确保在最差情况下能遍历完所有的gm
checkIndex := (hashIndex + i) % len(gmIPs)
sortedIp := gmIPs[checkIndex]
gmIns := a.GMInstance[sortedIp]
gmIns.Mutex.Lock()
if !gmIns.IsConnection {
Expand All @@ -334,30 +382,34 @@ func (a *MonitorAgent) ReporterDetectInfo(reporterInstance dbutil.DataBaseDetect
if err != nil {
log.Logger.Warnf("reporter gm failed. gm_ip:%s, gm_port:%d, err:%s", gmIns.Ip, gmIns.Port, err.Error())
gmIns.IsConnection = false
err = a.RepairGM(gmIns)
if err != nil {
log.Logger.Errorf("Repair gm failed:%s", err.Error())
return err
}
gmIns.Mutex.Unlock()
a.RepairGMConnection(gmIns)
//do retry
continue
} else {
isReported = true
gmIns.Mutex.Unlock()
a.ReportGMCache[ip] = &CachedHostInfo{
ReporterGMTime: time.Now(),
Ip: ip,
ExpireInterval: a.Conf.AgentConf.ReportInterval,
}
//report bind gmInfo to ha_agent_logs
if err = a.HaDBClient.ReportDBStatus(reporterInstance.GetApp(), a.MonIp, ip, port,
string(reporterInstance.GetDBType()), string(reporterInstance.GetStatus()), gmInfo); err != nil {
log.Logger.Errorf(
"reporter hadb instance status failed. err:%s, ip:%s, port:%d, db_type:%s, status:%s",
err.Error(), ip, port, reporterInstance.GetDBType(), reporterInstance.GetStatus())
log.Logger.Warnf(
"instance[%s#%d] reporter bind gm info[%s] failed:%s", ip, port, gmInfo, err.Error())
}
isReporter = true
gmIns.Mutex.Unlock()
break
}
gmIns.Mutex.Unlock()
}

if !isReporter {
if !isReported {
err = fmt.Errorf("get report GM failed: all gm disconnect")
log.Logger.Error(err.Error())
return err
}

return nil
}

Expand All @@ -381,32 +433,31 @@ func (a *MonitorAgent) flushGmFetchTime() {
a.LastFetchGMTime = time.Now()
}

// RepairGM if conn break, do reconnect
func (a *MonitorAgent) RepairGM(gmIns *GMConnection) error {
// RepairGMConnection if conn break, do reconnect
func (a *MonitorAgent) RepairGMConnection(gmIns *GMConnection) {
go func(gmIns *GMConnection) {
for {
gmIns.Mutex.Lock()
if gmIns.IsClose {
if gmIns.IsClose || gmIns.IsConnection {
gmIns.Mutex.Unlock()
return
}
address := gmIns.Ip + ":" + strconv.Itoa(gmIns.Port)
conn, err := net.Dial("tcp", address)
if err != nil {
log.Logger.Warn(
"RepairGM: ip:", gmIns.Ip, " port:", gmIns.Port, " connect failed, err:", err.Error())
"RepairGMConnection: ip:", gmIns.Ip, " port:", gmIns.Port, " connect failed, err:", err.Error())
} else {
gmIns.NetConnection = conn
gmIns.IsConnection = true
log.Logger.Info("RepairGM: ip:", gmIns.Ip, " port:", gmIns.Port, " connect success.")
log.Logger.Info("RepairGMConnection: ip:", gmIns.Ip, " port:", gmIns.Port, " connect success.")
gmIns.Mutex.Unlock()
return
}
gmIns.Mutex.Unlock()
time.Sleep(10 * time.Second)
time.Sleep(5 * time.Second)
}
}(gmIns)
return nil
}

// registerAgentInfoToHaDB register current agent info
Expand Down
2 changes: 1 addition & 1 deletion dbm-services/common/dbha/ha-module/client/hadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (c *HaDBClient) ReportDBStatus(app, agentIp, ip string, port int, dbType, s
return nil
}

// ReportHaLogRough report ha logs
// ReportHaLogRough report ha logs without return
func (c *HaDBClient) ReportHaLogRough(monIP, app, ip string, port int, module, comment string) {
_, _ = c.ReportHaLog(monIP, app, ip, port, module, comment)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (ins *MySQLCommonSwitch) CheckSlaveStatus() error {
slaveDelay, gmConf.GCM.AllowedSlaveDelayMax))

if timeDelay >= gmConf.GCM.AllowedTimeDelayMax {
return fmt.Errorf("IO_Thread delay on slave too large than master(%d >= %d)", timeDelay,
return fmt.Errorf("heartbeat delay on slave too large than master(%d >= %d)", timeDelay,
gmConf.GCM.AllowedTimeDelayMax)
}
ins.ReportLogs(constvar.InfoResult, fmt.Sprintf("IO_Thread delay [%d] in allowed range[%d]",
Expand Down Expand Up @@ -526,7 +526,7 @@ func (ins *MySQLCommonSwitch) CheckSlaveSlow(ignoreDelay bool) error {
}

binlogSizeMByte := maxBinlogSize.VariableValue / (1024 * 1024)
log.Logger.Infof("the slave max_binlog_size value is %d M!", binlogSizeMByte)
log.Logger.Infof("the slave max_binlog_size value is %dM!", binlogSizeMByte)

status, err := GetSlaveStatus(db)
if err != nil {
Expand Down
Loading

0 comments on commit 17f65cf

Please sign in to comment.