Skip to content

Commit

Permalink
redis refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
zakzheng authored and xjxia committed Aug 31, 2023
1 parent e0915d0 commit 1e59f4d
Show file tree
Hide file tree
Showing 33 changed files with 896 additions and 1,235 deletions.
4 changes: 2 additions & 2 deletions dbm-services/common/dbha/ha-module/agent/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ func (gm *GMConnection) Init() error {
}

// ReportInstance agent report instance detect info to gm
func (gm *GMConnection) ReportInstance(dbType string, jsonInfo []byte) error {
func (gm *GMConnection) ReportInstance(detectType string, jsonInfo []byte) error {
var writeBuf string
writeBuf += HEADER
writeBuf += "\r\n"
writeBuf += dbType
writeBuf += detectType
writeBuf += "\r\n"
writeBuf += strconv.Itoa(len(jsonInfo))
writeBuf += "\r\n"
Expand Down
20 changes: 10 additions & 10 deletions dbm-services/common/dbha/ha-module/agent/monitor_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type MonitorAgent struct {
City string
Campus string
//detect dbType
DBType string
DetectType string
// agent ip
MonIp string
LastFetchInsTime time.Time
Expand All @@ -41,12 +41,12 @@ type MonitorAgent struct {
}

// NewMonitorAgent new a new agent do detect
func NewMonitorAgent(conf *config.Config, dbType string) (*MonitorAgent, error) {
func NewMonitorAgent(conf *config.Config, detectType string) (*MonitorAgent, error) {
var err error
agent := &MonitorAgent{
City: conf.AgentConf.City,
Campus: conf.AgentConf.Campus,
DBType: dbType,
DetectType: detectType,
LastFetchInsTime: time.Now(),
LastFetchGMTime: time.Now(),
GMInstance: map[string]*GMConnection{},
Expand Down Expand Up @@ -109,7 +109,7 @@ func (a *MonitorAgent) RefreshInstanceCache() {
err := a.FetchDBInstance()
if err != nil {
log.Logger.Errorf("fetch %s instance failed. err:%s",
a.DBType, err.Error())
a.DetectType, err.Error())
}
a.flushInsFetchTime()
}
Expand Down Expand Up @@ -190,9 +190,9 @@ func (a *MonitorAgent) FetchDBInstance() error {

log.Logger.Debugf("fetch db instance info len:%d", len(rawInfo))
// get callback function by db type
cb, ok := dbmodule.DBCallbackMap[types.DBType(a.DBType)]
cb, ok := dbmodule.DBCallbackMap[a.DetectType]
if !ok {
err = fmt.Errorf("can't find fetch %s instance callback", a.DBType)
err = fmt.Errorf("can't find fetch %s instance callback", a.DetectType)
log.Logger.Error(err.Error())
return err
}
Expand Down Expand Up @@ -380,7 +380,7 @@ func (a *MonitorAgent) registerAgentInfoToHaDB() error {
"agent",
a.City,
a.Campus,
a.DBType)
a.DetectType)
if err != nil {
return err
}
Expand All @@ -391,7 +391,7 @@ func (a *MonitorAgent) registerAgentInfoToHaDB() error {
// only detect the minimum port instance, other instances ignore.
func (a *MonitorAgent) moduloHashSharding(allDbInstance []dbutil.DataBaseDetect) (map[string]dbutil.DataBaseDetect,
error) {
mod, modValue, err := a.HaDBClient.AgentGetHashValue(a.MonIp, a.DBType, a.Conf.AgentConf.FetchInterval)
mod, modValue, err := a.HaDBClient.AgentGetHashValue(a.MonIp, a.DetectType, a.Conf.AgentConf.FetchInterval)
if err != nil {
log.Logger.Errorf("get Modulo failed and wait next refresh time. err:%s", err.Error())
return nil, err
Expand All @@ -417,7 +417,7 @@ func (a *MonitorAgent) moduloHashSharding(allDbInstance []dbutil.DataBaseDetect)
// reporterHeartbeat send heartbeat to hadb
func (a *MonitorAgent) reporterHeartbeat() error {
interval := time.Now().Sub(a.heartbeat).Seconds()
err := a.HaDBClient.ReporterAgentHeartbeat(a.DBType, int(interval), "N/A")
err := a.HaDBClient.ReporterAgentHeartbeat(a.DetectType, int(interval), "N/A")
a.heartbeat = time.Now()
return err
}
Expand All @@ -426,7 +426,7 @@ func (a *MonitorAgent) reporterHeartbeat() error {
// only agent trigger double check(report GM) should call this
func (a *MonitorAgent) reporterBindGM(gmInfo string) error {
interval := time.Now().Sub(a.heartbeat).Seconds()
err := a.HaDBClient.ReporterAgentHeartbeat(a.DBType, int(interval), gmInfo)
err := a.HaDBClient.ReporterAgentHeartbeat(a.DetectType, int(interval), gmInfo)
a.heartbeat = time.Now()
return err
}
Expand Down
4 changes: 2 additions & 2 deletions dbm-services/common/dbha/ha-module/client/hadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (c *HaDBClient) GetAliveGMInfo(interval int) ([]GMInfo, error) {
}

// ReporterAgentHeartbeat report agent heartbeat to ha_status table
func (c *HaDBClient) ReporterAgentHeartbeat(dbType string, interval int, gmInfo string) error {
func (c *HaDBClient) ReporterAgentHeartbeat(detectType string, interval int, gmInfo string) error {
var result HaStatusResponse

currentTime := time.Now()
Expand All @@ -462,7 +462,7 @@ func (c *HaDBClient) ReporterAgentHeartbeat(dbType string, interval int, gmInfo
Name: constvar.ReporterAgentHeartbeat,
QueryArgs: &HaStatus{
IP: util.LocalIp,
DbType: dbType,
DbType: detectType,
},
SetArgs: &HaStatus{
ReportInterval: interval,
Expand Down
91 changes: 31 additions & 60 deletions dbm-services/common/dbha/ha-module/client/name_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,53 +133,29 @@ func (c *NameServiceClient) DeleteDomain(domainName string, app string, ip strin

// PolarisClbGWResp the response format for polaris and clb
type PolarisClbGWResp struct {
Message string `json:"message"`
Status int `json:"status"`
Ips []string `json:"ips,omitempty"`
}

// PolarisClbBodyParseCB the http body process callback for polaris and clb api
func PolarisClbBodyParseCB(b []byte) (interface{}, error) {
result := &PolarisClbGWResp{}
err := json.Unmarshal(b, result)
if err != nil {
log.Logger.Errorf("unmarshall %s to %+v get an error:%s", string(b), *result, err.Error())
return nil, fmt.Errorf("json unmarshal failed, err: %+v", err)
}

// check response and data is nil
if result.Status != statusSuccess {
log.Logger.Errorf("result.Code is %d not equal to %d,message:%s",
result.Status, statusSuccess, result.Message)
return nil, fmt.Errorf("%v - %v", result.Status, result.Message)
}
return result, nil
Ips []string `json:"ips,omitempty"`
}

// ClbDeRegister un-register address to clb
func (c *NameServiceClient) ClbDeRegister(region string, lbid string, listenid string, addr string) error {
func (c *NameServiceClient) ClbDeRegister(
region string, lbid string, listenid string, addr string) error {
req := map[string]interface{}{
"db_cloud_token": c.Conf.BKConf.BkToken,
"bk_cloud_id": c.CloudId,
"region": region,
"loadbalancerid": lbid,
"listenerid": listenid,
"ips": []string{addr},
}

log.Logger.Debugf("ClbDeRegister param:%v", req)
response, err := c.DoNewForCB(http.MethodPost,
response, err := c.DoNew(http.MethodPost,
c.SpliceUrlByPrefix(c.Conf.UrlPre, constvar.CLBDeRegisterUrl, ""),
req, nil, PolarisClbBodyParseCB)
req, nil)
if err != nil {
log.Logger.Errorf("ClbDeRegister failed,%s", err.Error())
return err
}

gwRsp := response.(*PolarisClbGWResp)
if gwRsp.Status != 0 {
return fmt.Errorf("%s failed, return code:%d, msg:%s",
util.AtWhere(), gwRsp.Status, gwRsp.Message)
}
log.Logger.Debugf("ClbDeRegister:%v", response)
return nil
}

Expand All @@ -188,29 +164,28 @@ func (c *NameServiceClient) ClbGetTargets(
region string, lbid string, listenid string,
) ([]string, error) {
req := map[string]interface{}{
"db_cloud_token": c.Conf.BKConf.BkToken,
"bk_cloud_id": c.CloudId,
"region": region,
"loadbalancerid": lbid,
"listenerid": listenid,
}

log.Logger.Debugf("ClbDeRegister param:%v", req)
response, err := c.DoNewForCB(http.MethodPost,
response, err := c.DoNew(http.MethodPost,
c.SpliceUrlByPrefix(c.Conf.UrlPre, constvar.CLBGetTargetsUrl, ""),
req, nil, PolarisClbBodyParseCB)
req, nil)
if err != nil {
log.Logger.Errorf("ClbGetTargets failed,%s", err.Error())
return nil, err
}

gwRsp := response.(*PolarisClbGWResp)
if gwRsp.Status != 0 {
gwErr := fmt.Errorf("%s failed, return code:%d, msg:%s",
util.AtWhere(), gwRsp.Status, gwRsp.Message)
return nil, gwErr
log.Logger.Debugf("ClbGet Response:%v", response)
var gwResp PolarisClbGWResp
err = json.Unmarshal(response.Data, &gwResp)
if err != nil {
log.Logger.Errorf("ClbGetTargets failed,%s", err.Error())
return make([]string, 0), err
}

return gwRsp.Ips, nil
return gwResp.Ips, nil
}

// GetPolarisTargets get target address from polaris
Expand All @@ -222,25 +197,26 @@ func (c *NameServiceClient) GetPolarisTargets(servicename string) ([]string, err
}

log.Logger.Debugf("GetPolarisTargets param:%v", req)
response, err := c.DoNewForCB(http.MethodPost,
response, err := c.DoNew(http.MethodPost,
c.SpliceUrlByPrefix(c.Conf.UrlPre, constvar.PolarisTargetsUrl, ""),
req, nil, PolarisClbBodyParseCB)
req, nil)
if err != nil {
return nil, err
}

gwRsp := response.(*PolarisClbGWResp)
if gwRsp.Status != 0 {
gwErr := fmt.Errorf("%s failed, return code:%d, msg:%s",
util.AtWhere(), gwRsp.Status, gwRsp.Message)
return nil, gwErr
var gwResp PolarisClbGWResp
err = json.Unmarshal(response.Data, &gwResp)
if err != nil {
log.Logger.Errorf("ClbGetTargets failed,%s", err.Error())
return make([]string, 0), err
}

return gwRsp.Ips, nil
return gwResp.Ips, nil
}

// PolarisUnBindTarget unbind address from polaris
func (c *NameServiceClient) PolarisUnBindTarget(servicename string, servertoken string, addr string) error {
func (c *NameServiceClient) PolarisUnBindTarget(
servicename string, servertoken string, addr string) error {
req := map[string]interface{}{
"db_cloud_token": c.Conf.BKConf.BkToken,
"bk_cloud_id": c.CloudId,
Expand All @@ -250,18 +226,13 @@ func (c *NameServiceClient) PolarisUnBindTarget(servicename string, servertoken
}

log.Logger.Debugf("PolarisUnBindTarget param:%v", req)
response, err := c.DoNewForCB(http.MethodPost,
response, err := c.DoNew(http.MethodPost,
c.SpliceUrlByPrefix(c.Conf.UrlPre, constvar.PolarisUnBindUrl, ""),
req, nil, PolarisClbBodyParseCB)
req, nil)
if err != nil {
log.Logger.Errorf("PolarisUnBindTarget failed,%s", err.Error())
return err
}

gwRsp := response.(*PolarisClbGWResp)
if gwRsp.Status != 0 {
return fmt.Errorf("%s failed, return code:%d, msg:%s",
util.AtWhere(), gwRsp.Status, gwRsp.Message)
}

log.Logger.Debugf("PolarisUnBindTarget response:%v", response)
return nil
}
16 changes: 8 additions & 8 deletions dbm-services/common/dbha/ha-module/constvar/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ const (
// DetectTenDBCluster detect TenDBCluster
DetectTenDBCluster = "tendbcluster"

//TendisCache if specified, agent detect would detect RedisCluster's cache
TendisCache = "Rediscache"
//Twemproxy if specified, agent detect would detect RedisCluster's proxy
Twemproxy = "Twemproxy"
// DetectRedis detect tendiscache and twemproxy
DetectRedis = "TwemproxyRedisInstance"
// DetectTendisplus detect tendisplus and predixy
DetectTendisplus = "PredixyTendisplusCluster"

//Predixy if specified, agent detect would detect TendisplusCluster's proxy layer
Predixy = "Predixy"
Expand Down Expand Up @@ -186,13 +186,13 @@ const (
// CmDBEntryDetailUrl TODO
CmDBEntryDetailUrl = "dbmeta/dbha/entry_detail/"
// CLBDeRegisterUrl TODO
CLBDeRegisterUrl = "clb_deregister_part_target/"
CLBDeRegisterUrl = "deregister_part_target/"
// CLBGetTargetsUrl TODO
CLBGetTargetsUrl = "clb_get_target_private_ips/"
CLBGetTargetsUrl = "get_target_private_ips/"
// PolarisTargetsUrl TODO
PolarisTargetsUrl = "polaris_describe_targets/"
PolarisTargetsUrl = "describe_targets/"
// PolarisUnBindUrl TODO
PolarisUnBindUrl = "polaris_unbind_part_targets/"
PolarisUnBindUrl = "unbind_part_targets/"
// BKConfigBatchUrl TODO
BKConfigBatchUrl = "bkconfig/v1/confitem/batchget/"
// BKConfigQueryUrl TODO
Expand Down
12 changes: 6 additions & 6 deletions dbm-services/common/dbha/ha-module/dbha.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,18 @@ func main() {
switch dbhaType {
case constvar.Agent:
// new agent for each db type
for _, dbType := range conf.AgentConf.ActiveDBType {
go func(dbType string) {
Agent, err := agent.NewMonitorAgent(conf, dbType)
for _, clusterType := range conf.AgentConf.ActiveDBType {
go func(clusterType string) {
Agent, err := agent.NewMonitorAgent(conf, clusterType)
if err != nil {
log.Logger.Fatalf("agent init failed. dbtype:%s err:%s", dbType, err.Error())
log.Logger.Fatalf("agent init failed. clustertype:%s err:%s", clusterType, err.Error())
}

err = Agent.Run()
if err != nil {
log.Logger.Fatalf("agent run failed. dbtype:%s err:%s", dbType, err.Error())
log.Logger.Fatalf("agent run failed. clustertype:%s err:%s", clusterType, err.Error())
}
}(dbType)
}(clusterType)
}
var c chan struct{}
<-c
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (ins *MySQLSwitch) ShowSwitchInstanceInfo() string {
ins.Ip, ins.Port, ins.IDC, ins.Role, ins.Status, ins.App, ins.ClusterType,
ins.MetaType)
//TODO right way to check empty?
if ins.StandBySlave != (MySQLSlaveInfo{}) {
if ins.StandBySlave != (dbutil.SlaveInfo{}) {
str = fmt.Sprintf("%s Switch from MASTER:<%s#%d> to SLAVE:<%s#%d>",
str, ins.Ip, ins.Port, ins.StandBySlave.Ip, ins.StandBySlave.Port)
}
Expand All @@ -44,7 +44,7 @@ func (ins *MySQLSwitch) CheckSwitch() (bool, error) {
log.Logger.Infof("info:{%s} is master", ins.ShowSwitchInstanceInfo())

log.Logger.Infof("check slave status. info{%s}", ins.ShowSwitchInstanceInfo())
if ins.StandBySlave == (MySQLSlaveInfo{}) {
if ins.StandBySlave == (dbutil.SlaveInfo{}) {
ins.ReportLogs(constvar.FailResult, "no standby slave info found")
return false, err
}
Expand Down
Loading

0 comments on commit 1e59f4d

Please sign in to comment.