Skip to content

Commit

Permalink
feat: replica selector (#692)
Browse files Browse the repository at this point in the history
* feat: add reader node selector

* doc: add comments

* fix: wrong error message

* test: add send read-only command to reader node

* refactor: use sendtoreplicas and replicaselector

* test: remove duplicated cases

* test: remove cases

* perf: introduce nodes
  • Loading branch information
proost authored Dec 13, 2024
1 parent 383a2a1 commit 29524d2
Show file tree
Hide file tree
Showing 4 changed files with 1,454 additions and 33 deletions.
61 changes: 43 additions & 18 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
var ErrNoSlot = errors.New("the slot has no redis node")
var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplicas option")
var ErrInvalidShardsRefreshInterval = errors.New("ShardsRefreshInterval must be greater than or equal to 0")
var ErrReplicaOnlyConflictWithReplicaSelector = errors.New("ReplicaOnly conflicts with ReplicaSelector option")
var ErrSendToReplicasNotSet = errors.New("SendToReplicas must be set when ReplicaSelector is set")

type clusterClient struct {
pslots [16384]conn
Expand All @@ -42,6 +44,10 @@ type connrole struct {
//replica bool <- this field is removed because a server may have mixed roles at the same time in the future. https://github.com/valkey-io/valkey/issues/1372
}

var replicaOnlySelector = func(_ uint16, replicas []ReplicaInfo) int {
return util.FastRand(len(replicas))
}

func newClusterClient(opt *ClientOption, connFn connFn, retryer retryHandler) (*clusterClient, error) {
client := &clusterClient{
cmd: cmds.NewBuilder(cmds.InitSlot),
Expand All @@ -56,6 +62,16 @@ func newClusterClient(opt *ClientOption, connFn connFn, retryer retryHandler) (*
if opt.ReplicaOnly && opt.SendToReplicas != nil {
return nil, ErrReplicaOnlyConflict
}
if opt.ReplicaOnly && opt.ReplicaSelector != nil {
return nil, ErrReplicaOnlyConflictWithReplicaSelector
}
if opt.ReplicaSelector != nil && opt.SendToReplicas == nil {
return nil, ErrSendToReplicasNotSet
}

if opt.SendToReplicas != nil && opt.ReplicaSelector == nil {
opt.ReplicaSelector = replicaOnlySelector
}

if opt.SendToReplicas != nil {
rOpt := *opt
Expand Down Expand Up @@ -194,12 +210,12 @@ func (c *clusterClient) _refresh() (err error) {
for master, g := range groups {
conns[master] = connrole{conn: c.connFn(master, c.opt)}
if c.rOpt != nil {
for _, addr := range g.nodes[1:] {
conns[addr] = connrole{conn: c.connFn(addr, c.rOpt)}
for _, nodeInfo := range g.nodes[1:] {
conns[nodeInfo.Addr] = connrole{conn: c.connFn(nodeInfo.Addr, c.rOpt)}
}
} else {
for _, addr := range g.nodes[1:] {
conns[addr] = connrole{conn: c.connFn(addr, c.opt)}
for _, nodeInfo := range g.nodes[1:] {
conns[nodeInfo.Addr] = connrole{conn: c.connFn(nodeInfo.Addr, c.opt)}
}
}
}
Expand Down Expand Up @@ -234,18 +250,25 @@ func (c *clusterClient) _refresh() (err error) {
nodesCount := len(g.nodes)
for _, slot := range g.slots {
for i := slot[0]; i <= slot[1]; i++ {
pslots[i] = conns[g.nodes[1+util.FastRand(nodesCount-1)]].conn
pslots[i] = conns[g.nodes[1+util.FastRand(nodesCount-1)].Addr].conn
}
}
case c.rOpt != nil: // implies c.opt.SendToReplicas != nil
case c.rOpt != nil:
if len(rslots) == 0 { // lazy init
rslots = make([]conn, 16384)
}
if len(g.nodes) > 1 {
n := len(g.nodes) - 1
for _, slot := range g.slots {
for i := slot[0]; i <= slot[1]; i++ {
pslots[i] = conns[master].conn
rslots[i] = conns[g.nodes[1+util.FastRand(len(g.nodes)-1)]].conn

rIndex := c.opt.ReplicaSelector(uint16(i), g.nodes[1:])
if rIndex >= 0 && rIndex < n {
rslots[i] = conns[g.nodes[1+rIndex].Addr].conn
} else {
rslots[i] = conns[master].conn
}
}
}
} else {
Expand Down Expand Up @@ -297,8 +320,10 @@ func (c *clusterClient) nodes() []string {
return nodes
}

type nodes []ReplicaInfo

type group struct {
nodes []string
nodes nodes
slots [][2]int64
}

Expand All @@ -324,10 +349,10 @@ func parseSlots(slots RedisMessage, defaultAddr string) map[string]group {
g, ok := groups[master]
if !ok {
g.slots = make([][2]int64, 0)
g.nodes = make([]string, 0, len(v.values)-2)
g.nodes = make(nodes, 0, len(v.values)-2)
for i := 2; i < len(v.values); i++ {
if dst := parseEndpoint(defaultAddr, v.values[i].values[0].string, v.values[i].values[1].integer); dst != "" {
g.nodes = append(g.nodes, dst)
g.nodes = append(g.nodes, ReplicaInfo{Addr: dst})
}
}
}
Expand All @@ -345,16 +370,16 @@ func parseShards(shards RedisMessage, defaultAddr string, tls bool) map[string]g
m := -1
shard, _ := v.AsMap()
slots := shard["slots"].values
nodes := shard["nodes"].values
_nodes := shard["nodes"].values
g := group{
nodes: make([]string, 0, len(nodes)),
nodes: make(nodes, 0, len(_nodes)),
slots: make([][2]int64, len(slots)/2),
}
for i := range g.slots {
g.slots[i][0], _ = slots[i*2].AsInt64()
g.slots[i][1], _ = slots[i*2+1].AsInt64()
}
for _, n := range nodes {
for _, n := range _nodes {
dict, _ := n.AsMap()
if dict["health"].string != "online" {
continue
Expand All @@ -367,12 +392,12 @@ func parseShards(shards RedisMessage, defaultAddr string, tls bool) map[string]g
if dict["role"].string == "master" {
m = len(g.nodes)
}
g.nodes = append(g.nodes, dst)
g.nodes = append(g.nodes, ReplicaInfo{Addr: dst})
}
}
if m >= 0 {
g.nodes[0], g.nodes[m] = g.nodes[m], g.nodes[0]
groups[g.nodes[0]] = g
groups[g.nodes[0].Addr] = g
}
}
return groups
Expand Down Expand Up @@ -1078,15 +1103,15 @@ func (c *clusterClient) Dedicate() (DedicatedClient, func()) {

func (c *clusterClient) Nodes() map[string]Client {
c.mu.RLock()
nodes := make(map[string]Client, len(c.conns))
_nodes := make(map[string]Client, len(c.conns))
disableCache := c.opt != nil && c.opt.DisableCache
for addr, cc := range c.conns {
if !cc.hidden {
nodes[addr] = newSingleClientWithConn(cc.conn, c.cmd, c.retry, disableCache, c.retryHandler)
_nodes[addr] = newSingleClientWithConn(cc.conn, c.cmd, c.retry, disableCache, c.retryHandler)
}
}
c.mu.RUnlock()
return nodes
return _nodes
}

func (c *clusterClient) Close() {
Expand Down
Loading

0 comments on commit 29524d2

Please sign in to comment.