Skip to content

Commit

Permalink
fix(backend): Redis-DBHA-切换修复 #3845
Browse files Browse the repository at this point in the history
  • Loading branch information
xiepaup authored and zhangzhw8 committed Apr 8, 2024
1 parent 70d70a5 commit 360dd92
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 3 deletions.
13 changes: 13 additions & 0 deletions dbm-services/common/dbha/ha-module/client/redis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,19 @@ func (r *RedisClient) InfoV2(section string) (infoRet map[string]string, err err
return
}

// Get Redis `GET key` command. It returns redis.Nil error when key does not exist.
func (r *RedisClient) Get(key string) (ret string, err error) {
if r.mode != RedisInstance {
ret, err = r.crdb.Get(context.TODO(), key).Result()
} else {
ret, err = r.rdb.Get(context.TODO(), key).Result()
}
if err != nil && err != redis.Nil {
return
}
return ret, nil
}

// SlaveOf TODO
func (r *RedisClient) SlaveOf(host, port string) (ret string, err error) {
if r.mode == RedisInstance {
Expand Down
36 changes: 33 additions & 3 deletions dbm-services/common/dbha/ha-module/dbmodule/redis/redis_switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"net"
"sort"
"strconv"
Expand Down Expand Up @@ -523,7 +524,8 @@ func (ins *RedisSwitch) GetTwemproxyBackends(ip string, adminPort int) (segs map
// CheckSlaveSyncStatus // 5. 检查同步状态
func (ins *RedisSwitch) CheckSlaveSyncStatus(masterIp string, masterPort int, slaveIp string, slavePort int) error {
slaveAddr, slaveConn := fmt.Sprintf("%s:%d", slaveIp, slavePort), &client.RedisClient{}
slaveConn.Init(slaveAddr, ins.Pass, ins.Timeout, 0)
masterAddr := fmt.Sprintf("%s:%d", masterIp, masterPort)
slaveConn.Init(slaveAddr, ins.Pass, ins.Timeout, 1)
defer slaveConn.Close()

replic, err := slaveConn.InfoV2("replication")
Expand All @@ -547,8 +549,10 @@ func (ins *RedisSwitch) CheckSlaveSyncStatus(masterIp string, masterPort int, sl
return err
}

if replic["master_link_status"] != "up" {
err := fmt.Errorf("unexpected status master_link_status:%s", replic["master_link_status"])
// master_last_io_seconds_ago:-1 master_link_status:down master_link_down_since_seconds:160 master_port:30001
// master_last_io_seconds_ago:114 master_link_status:up master_port:30000 master_repl_offset:2113331 master |
if err := ins.checkReplicationSync(slaveConn, masterAddr, slaveAddr); err != nil {
err := fmt.Errorf("unexpected status master_sync_status:%s", err)
ins.ReportLogs(constvar.FailResult, fmt.Sprintf("redis switch precheck: (%s) : %s", slaveAddr, err.Error()))
return err
}
Expand All @@ -563,6 +567,32 @@ func (ins *RedisSwitch) CheckSlaveSyncStatus(masterIp string, masterPort int, sl
return nil
}

// checkReplicationSync # here we just check the master heartbeat:
func (ins *RedisSwitch) checkReplicationSync(newMasterConn *client.RedisClient,
masterAddr, slaveAddr string) (err error) {
var masterTime, slaveTime int64

rst, err := newMasterConn.Get(fmt.Sprintf("%s:time", masterAddr))
if err != nil {
return fmt.Errorf("[%s]new master node, exec cmd err:%+v", masterAddr, err)
}
if masterTime, err = strconv.ParseInt(rst, 10, 64); err != nil {
return fmt.Errorf("[%s]new master node, time2Int64 err:%+v", masterAddr, err)
}

slaveTime = time.Now().Unix() // here gcs.perl use redis-cli time

slaveMasterDiffTime := math.Abs(float64(slaveTime) - float64(masterTime))
if slaveMasterDiffTime > MaxLastIOSecondsAgo {
return fmt.Errorf("err master slave sync too long %s => %s diff: %.0f(%d)",
masterAddr, slaveAddr, slaveMasterDiffTime, MaxLastIOSecondsAgo)
}

log.Logger.Infof("[%s]new master node, master on slave time:%d, diff:%.0f slave time:%d",
slaveAddr, masterTime, slaveMasterDiffTime, slaveTime)
return nil
}

// IsSlave check instance is slave or not
func (ins *RedisSwitch) IsSlave() bool {
return strings.Contains(ins.Role, "slave")
Expand Down

0 comments on commit 360dd92

Please sign in to comment.