diff --git a/cluster.go b/cluster.go index cb1a76ad..8babd083 100644 --- a/cluster.go +++ b/cluster.go @@ -37,9 +37,9 @@ type clusterClient struct { // NOTE: connrole and conn must be initialized at the same time type connrole struct { - conn conn - replica bool - hidden bool + conn conn + hidden bool + //replica bool <- this field is removed because a server may have mixed roles at the same time in the future. https://github.com/valkey-io/valkey/issues/1372 } func newClusterClient(opt *ClientOption, connFn connFn, retryer retryHandler) (*clusterClient, error) { @@ -192,12 +192,14 @@ func (c *clusterClient) _refresh() (err error) { groups := result.parse(c.opt.TLSConfig != nil) conns := make(map[string]connrole, len(groups)) for master, g := range groups { - conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false} - for _, addr := range g.nodes[1:] { - if c.rOpt != nil { - conns[addr] = connrole{conn: c.connFn(addr, c.rOpt), replica: true} - } else { - conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true} + conns[master] = connrole{conn: c.connFn(master, c.opt)} + if c.rOpt != nil { + for _, addr := range g.nodes[1:] { + conns[addr] = connrole{conn: c.connFn(addr, c.rOpt)} + } + } else { + for _, addr := range g.nodes[1:] { + conns[addr] = connrole{conn: c.connFn(addr, c.opt)} } } } @@ -215,13 +217,9 @@ func (c *clusterClient) _refresh() (err error) { c.mu.RLock() for addr, cc := range c.conns { - fresh, ok := conns[addr] - if ok && (cc.replica == fresh.replica || c.rOpt == nil) { - conns[addr] = connrole{ - conn: cc.conn, - replica: fresh.replica, - hidden: fresh.hidden, - } + if fresh, ok := conns[addr]; ok { + fresh.conn = cc.conn + conns[addr] = fresh } else { removes = append(removes, cc.conn) } @@ -397,9 +395,6 @@ func (c *clusterClient) _pick(slot uint16, toReplica bool) (p conn) { c.mu.RLock() if slot == cmds.InitSlot { for _, cc := range c.conns { - if cc.replica { - continue - } p = cc.conn break } @@ -434,7 +429,7 @@ func (c *clusterClient) redirectOrNew(addr string, prev conn, slot uint16, mode c.mu.Lock() if cc = c.conns[addr]; cc.conn == nil { p := c.connFn(addr, c.opt) - cc = connrole{conn: p, replica: false} + cc = connrole{conn: p} c.conns[addr] = cc if mode == RedirectMove { c.pslots[slot] = p @@ -448,14 +443,10 @@ func (c *clusterClient) redirectOrNew(addr string, prev conn, slot uint16, mode prev.Close() }(prev) p := c.connFn(addr, c.opt) - cc = connrole{conn: p, replica: cc.replica} + cc = connrole{conn: p} c.conns[addr] = cc - if mode == RedirectMove { - if cc.replica { - c.rslots[slot] = p - } else { - c.pslots[slot] = p - } + if mode == RedirectMove { // MOVED should always point to the primary. + c.pslots[slot] = p } } c.mu.Unlock() diff --git a/cluster_test.go b/cluster_test.go index 1daf87fd..36e37219 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -33,23 +33,6 @@ var slotsResp = newResult(RedisMessage{typ: '*', values: []RedisMessage{ }}, }}, nil) -var slotsRespWithChangedRole = newResult(RedisMessage{typ: '*', values: []RedisMessage{ - {typ: '*', values: []RedisMessage{ - {typ: ':', integer: 0}, - {typ: ':', integer: 16383}, - {typ: '*', values: []RedisMessage{ // master - {typ: '+', string: "127.0.1.1"}, - {typ: ':', integer: 1}, - {typ: '+', string: ""}, - }}, - {typ: '*', values: []RedisMessage{ // replica - {typ: '+', string: "127.0.0.1"}, - {typ: ':', integer: 0}, - {typ: '+', string: ""}, - }}, - }}, -}}, nil) - var slotsMultiResp = newResult(RedisMessage{typ: '*', values: []RedisMessage{ {typ: '*', values: []RedisMessage{ {typ: ':', integer: 0}, @@ -1937,13 +1920,6 @@ func TestClusterClient_SendToOnlyPrimaryNodes(t *testing.T) { t.Fatalf("unexpected err %v", err) } - t.Run("Do with no slot", func(t *testing.T) { - c := client.B().Info().Build() - if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "INFO" { - t.Fatalf("unexpected response %v %v", v, err) - } - }) - t.Run("Do", func(t *testing.T) { c := client.B().Get().Key("Do").Build() if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "GET Do" { @@ -5090,55 +5066,6 @@ func TestClusterTopologyRefreshment(t *testing.T) { } } }) - - t.Run("node role are changed", func(t *testing.T) { - var callCount int64 - refreshWaitCh := make(chan struct{}) - cli, err := newClusterClient( - &ClientOption{ - InitAddress: []string{"127.0.0.1:0"}, - ClusterOption: ClusterOption{ - ShardsRefreshInterval: time.Second, - }, - }, - func(dst string, opt *ClientOption) conn { - return &mockConn{ - DoFn: func(cmd Completed) RedisResult { - if c := atomic.AddInt64(&callCount, 1); c >= 6 { - defer func() { recover() }() - defer close(refreshWaitCh) - return slotsRespWithChangedRole - } else if c >= 3 { - return slotsRespWithChangedRole - } - return slotsResp - }, - } - }, - newRetryer(defaultRetryDelayFn), - ) - if err != nil { - t.Fatalf("unexpected err %v", err) - } - - select { - case <-refreshWaitCh: - cli.Close() - - cli.mu.Lock() - conns := cli.conns - cli.mu.Unlock() - if len(conns) != 2 { - t.Fatalf("unexpected conns %v", conns) - } - if cc, ok := conns["127.0.0.1:0"]; !ok || !cc.replica { - t.Fatalf("unexpected conns %v", conns) - } - if cc, ok := conns["127.0.1.1:1"]; !ok || cc.replica { - t.Fatalf("unexpected conns %v", conns) - } - } - }) } func TestClusterClientLoadingRetry(t *testing.T) {