Skip to content

Commit

Permalink
fix: panic on cluster redirection to a node with stale role
Browse files Browse the repository at this point in the history
Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Dec 12, 2024
1 parent 723761f commit e307484
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 93 deletions.
45 changes: 18 additions & 27 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ type clusterClient struct {

// NOTE: connrole and conn must be initialized at the same time
type connrole struct {
conn conn
replica bool
hidden bool
conn conn
hidden bool
//replica bool <- this field is removed because a server may have mixed roles at the same time in the future. https://github.com/valkey-io/valkey/issues/1372
}

func newClusterClient(opt *ClientOption, connFn connFn, retryer retryHandler) (*clusterClient, error) {
Expand Down Expand Up @@ -192,12 +192,14 @@ func (c *clusterClient) _refresh() (err error) {
groups := result.parse(c.opt.TLSConfig != nil)
conns := make(map[string]connrole, len(groups))
for master, g := range groups {
conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false}
for _, addr := range g.nodes[1:] {
if c.rOpt != nil {
conns[addr] = connrole{conn: c.connFn(addr, c.rOpt), replica: true}
} else {
conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true}
conns[master] = connrole{conn: c.connFn(master, c.opt)}
if c.rOpt != nil {
for _, addr := range g.nodes[1:] {
conns[addr] = connrole{conn: c.connFn(addr, c.rOpt)}
}
} else {
for _, addr := range g.nodes[1:] {
conns[addr] = connrole{conn: c.connFn(addr, c.opt)}
}
}
}
Expand All @@ -215,13 +217,9 @@ func (c *clusterClient) _refresh() (err error) {

c.mu.RLock()
for addr, cc := range c.conns {
fresh, ok := conns[addr]
if ok && (cc.replica == fresh.replica || c.rOpt == nil) {
conns[addr] = connrole{
conn: cc.conn,
replica: fresh.replica,
hidden: fresh.hidden,
}
if fresh, ok := conns[addr]; ok {
fresh.conn = cc.conn
conns[addr] = fresh
} else {
removes = append(removes, cc.conn)
}
Expand Down Expand Up @@ -397,9 +395,6 @@ func (c *clusterClient) _pick(slot uint16, toReplica bool) (p conn) {
c.mu.RLock()
if slot == cmds.InitSlot {
for _, cc := range c.conns {
if cc.replica {
continue
}
p = cc.conn
break
}
Expand Down Expand Up @@ -434,7 +429,7 @@ func (c *clusterClient) redirectOrNew(addr string, prev conn, slot uint16, mode
c.mu.Lock()
if cc = c.conns[addr]; cc.conn == nil {
p := c.connFn(addr, c.opt)
cc = connrole{conn: p, replica: false}
cc = connrole{conn: p}
c.conns[addr] = cc
if mode == RedirectMove {
c.pslots[slot] = p
Expand All @@ -448,14 +443,10 @@ func (c *clusterClient) redirectOrNew(addr string, prev conn, slot uint16, mode
prev.Close()
}(prev)
p := c.connFn(addr, c.opt)
cc = connrole{conn: p, replica: cc.replica}
cc = connrole{conn: p}
c.conns[addr] = cc
if mode == RedirectMove {
if cc.replica {
c.rslots[slot] = p
} else {
c.pslots[slot] = p
}
if mode == RedirectMove { // MOVED should always point to the primary.
c.pslots[slot] = p
}
}
c.mu.Unlock()
Expand Down
66 changes: 0 additions & 66 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,6 @@ var slotsResp = newResult(RedisMessage{typ: '*', values: []RedisMessage{
}},
}}, nil)

var slotsRespWithChangedRole = newResult(RedisMessage{typ: '*', values: []RedisMessage{
{typ: '*', values: []RedisMessage{
{typ: ':', integer: 0},
{typ: ':', integer: 16383},
{typ: '*', values: []RedisMessage{ // master
{typ: '+', string: "127.0.1.1"},
{typ: ':', integer: 1},
{typ: '+', string: ""},
}},
{typ: '*', values: []RedisMessage{ // replica
{typ: '+', string: "127.0.0.1"},
{typ: ':', integer: 0},
{typ: '+', string: ""},
}},
}},
}}, nil)

var slotsMultiResp = newResult(RedisMessage{typ: '*', values: []RedisMessage{
{typ: '*', values: []RedisMessage{
{typ: ':', integer: 0},
Expand Down Expand Up @@ -5090,55 +5073,6 @@ func TestClusterTopologyRefreshment(t *testing.T) {
}
}
})

t.Run("node role are changed", func(t *testing.T) {
var callCount int64
refreshWaitCh := make(chan struct{})
cli, err := newClusterClient(
&ClientOption{
InitAddress: []string{"127.0.0.1:0"},
ClusterOption: ClusterOption{
ShardsRefreshInterval: time.Second,
},
},
func(dst string, opt *ClientOption) conn {
return &mockConn{
DoFn: func(cmd Completed) RedisResult {
if c := atomic.AddInt64(&callCount, 1); c >= 6 {
defer func() { recover() }()
defer close(refreshWaitCh)
return slotsRespWithChangedRole
} else if c >= 3 {
return slotsRespWithChangedRole
}
return slotsResp
},
}
},
newRetryer(defaultRetryDelayFn),
)
if err != nil {
t.Fatalf("unexpected err %v", err)
}

select {
case <-refreshWaitCh:
cli.Close()

cli.mu.Lock()
conns := cli.conns
cli.mu.Unlock()
if len(conns) != 2 {
t.Fatalf("unexpected conns %v", conns)
}
if cc, ok := conns["127.0.0.1:0"]; !ok || !cc.replica {
t.Fatalf("unexpected conns %v", conns)
}
if cc, ok := conns["127.0.1.1:1"]; !ok || cc.replica {
t.Fatalf("unexpected conns %v", conns)
}
}
})
}

func TestClusterClientLoadingRetry(t *testing.T) {
Expand Down

0 comments on commit e307484

Please sign in to comment.