diff --git a/dbm-services/common/dbha/ha-module/client/redis_client.go b/dbm-services/common/dbha/ha-module/client/redis_client.go index 569202aa1d..4bac93e77b 100644 --- a/dbm-services/common/dbha/ha-module/client/redis_client.go +++ b/dbm-services/common/dbha/ha-module/client/redis_client.go @@ -147,6 +147,19 @@ func (r *RedisClient) InfoV2(section string) (infoRet map[string]string, err err return } +// Get Redis `GET key` command. It returns redis.Nil error when key does not exist. +func (r *RedisClient) Get(key string) (ret string, err error) { + if r.mode != RedisInstance { + ret, err = r.crdb.Get(context.TODO(), key).Result() + } else { + ret, err = r.rdb.Get(context.TODO(), key).Result() + } + if err != nil && err != redis.Nil { + return + } + return ret, nil +} + // SlaveOf TODO func (r *RedisClient) SlaveOf(host, port string) (ret string, err error) { if r.mode == RedisInstance { diff --git a/dbm-services/common/dbha/ha-module/dbmodule/redis/redis_switch.go b/dbm-services/common/dbha/ha-module/dbmodule/redis/redis_switch.go index 411662c18a..ff45cdd0fc 100644 --- a/dbm-services/common/dbha/ha-module/dbmodule/redis/redis_switch.go +++ b/dbm-services/common/dbha/ha-module/dbmodule/redis/redis_switch.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "io" + "math" "net" "sort" "strconv" @@ -523,7 +524,8 @@ func (ins *RedisSwitch) GetTwemproxyBackends(ip string, adminPort int) (segs map // CheckSlaveSyncStatus // 5. 检查同步状态 func (ins *RedisSwitch) CheckSlaveSyncStatus(masterIp string, masterPort int, slaveIp string, slavePort int) error { slaveAddr, slaveConn := fmt.Sprintf("%s:%d", slaveIp, slavePort), &client.RedisClient{} - slaveConn.Init(slaveAddr, ins.Pass, ins.Timeout, 0) + masterAddr := fmt.Sprintf("%s:%d", masterIp, masterPort) + slaveConn.Init(slaveAddr, ins.Pass, ins.Timeout, 1) defer slaveConn.Close() replic, err := slaveConn.InfoV2("replication") @@ -547,8 +549,10 @@ func (ins *RedisSwitch) CheckSlaveSyncStatus(masterIp string, masterPort int, sl return err } - if replic["master_link_status"] != "up" { - err := fmt.Errorf("unexpected status master_link_status:%s", replic["master_link_status"]) + // master_last_io_seconds_ago:-1 master_link_status:down master_link_down_since_seconds:160 master_port:30001 + // master_last_io_seconds_ago:114 master_link_status:up master_port:30000 master_repl_offset:2113331 master | + if err := ins.checkReplicationSync(slaveConn, masterAddr, slaveAddr); err != nil { + err := fmt.Errorf("unexpected status master_sync_status:%s", err) ins.ReportLogs(constvar.FailResult, fmt.Sprintf("redis switch precheck: (%s) : %s", slaveAddr, err.Error())) return err } @@ -563,6 +567,32 @@ func (ins *RedisSwitch) CheckSlaveSyncStatus(masterIp string, masterPort int, sl return nil } +// checkReplicationSync # here we just check the master heartbeat: +func (ins *RedisSwitch) checkReplicationSync(newMasterConn *client.RedisClient, + masterAddr, slaveAddr string) (err error) { + var masterTime, slaveTime int64 + + rst, err := newMasterConn.Get(fmt.Sprintf("%s:time", masterAddr)) + if err != nil { + return fmt.Errorf("[%s]new master node, exec cmd err:%+v", masterAddr, err) + } + if masterTime, err = strconv.ParseInt(rst, 10, 64); err != nil { + return fmt.Errorf("[%s]new master node, time2Int64 err:%+v", masterAddr, err) + } + + slaveTime = time.Now().Unix() // here gcs.perl use redis-cli time + + slaveMasterDiffTime := math.Abs(float64(slaveTime) - float64(masterTime)) + if slaveMasterDiffTime > MaxLastIOSecondsAgo { + return fmt.Errorf("err master slave sync too long %s => %s diff: %.0f(%d)", + masterAddr, slaveAddr, slaveMasterDiffTime, MaxLastIOSecondsAgo) + } + + log.Logger.Infof("[%s]new master node, master on slave time:%d, diff:%.0f slave time:%d", + slaveAddr, masterTime, slaveMasterDiffTime, slaveTime) + return nil +} + // IsSlave check instance is slave or not func (ins *RedisSwitch) IsSlave() bool { return strings.Contains(ins.Role, "slave")