Skip to content

Commit

Permalink
perf: reduce goroutines used by DoMulti/DoMultiCache in a cluster client
Browse files Browse the repository at this point in the history
Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Dec 13, 2024
1 parent 7e284ae commit 112ae7f
Showing 1 changed file with 36 additions and 18 deletions.
54 changes: 36 additions & 18 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,12 +528,17 @@ func (c *clusterClient) toReplica(cmd Completed) bool {

func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
last := cmds.InitSlot
init := false
init := 0

for _, cmd := range multi {
if cmd.Slot() == cmds.InitSlot {
init = true
break
init++
continue
}
if last == cmds.InitSlot {
last = cmd.Slot()
} else if init > 0 && last != cmd.Slot() {
panic(panicMixCxSlot)
}
}

Expand All @@ -542,18 +547,18 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {

count := conncountp.Get(len(c.conns), len(c.conns))

if !init && c.rslots != nil && c.opt.SendToReplicas != nil {
if init == 0 && c.rslots != nil && c.opt.SendToReplicas != nil {
for _, cmd := range multi {
var p conn
var cc conn
if c.opt.SendToReplicas(cmd) {
p = c.rslots[cmd.Slot()]
cc = c.rslots[cmd.Slot()]
} else {
p = c.pslots[cmd.Slot()]
cc = c.pslots[cmd.Slot()]
}
if p == nil {
if cc == nil {
return nil
}
count.m[p]++
count.m[cc]++
}

retries = connretryp.Get(len(count.m), len(count.m))
Expand All @@ -569,25 +574,20 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
} else {
cc = c.pslots[cmd.Slot()]
}

if cc == nil { // check cc == nil again in case of non-deterministic SendToReplicas.
return nil
}
re := retries.m[cc]
re.commands = append(re.commands, cmd)
re.cIndexes = append(re.cIndexes, i)
}
return retries
}

inits := 0
for _, cmd := range multi {
if cmd.Slot() == cmds.InitSlot {
inits++
continue
}
if last == cmds.InitSlot {
last = cmd.Slot()
} else if init && last != cmd.Slot() {
panic(panicMixCxSlot)
}
p := c.pslots[cmd.Slot()]
if p == nil {
return nil
Expand All @@ -600,7 +600,7 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
for i, p := range c.pslots {
if p != nil {
last = uint16(i)
count.m[p] = inits
count.m[p] = init
break
}
}
Expand Down Expand Up @@ -726,13 +726,22 @@ func (c *clusterClient) DoMulti(ctx context.Context, multi ...Completed) []Redis
retry:
retries.RetryDelay = -1 // Assume no retry. Because client retry flag can be set to false.

var cc1 conn
var re1 *retry
wg.Add(len(retries.m))
mu.Lock()
for cc, re := range retries.m {
delete(retries.m, cc)
cc1 = cc
re1 = re
break
}
for cc, re := range retries.m {
delete(retries.m, cc)
go c.doretry(ctx, cc, results, retries, re, &mu, &wg, attempts)
}
mu.Unlock()
c.doretry(ctx, cc1, results, retries, re1, &mu, &wg, attempts)
wg.Wait()

if len(retries.m) != 0 {
Expand Down Expand Up @@ -997,13 +1006,22 @@ func (c *clusterClient) DoMultiCache(ctx context.Context, multi ...CacheableTTL)
retry:
retries.RetryDelay = -1 // Assume no retry. Because client retry flag can be set to false.

var cc1 conn
var re1 *retrycache
wg.Add(len(retries.m))
mu.Lock()
for cc, re := range retries.m {
delete(retries.m, cc)
cc1 = cc
re1 = re
break
}
for cc, re := range retries.m {
delete(retries.m, cc)
go c.doretrycache(ctx, cc, results, retries, re, &mu, &wg, attempts)
}
mu.Unlock()
c.doretrycache(ctx, cc1, results, retries, re1, &mu, &wg, attempts)
wg.Wait()

if len(retries.m) != 0 {
Expand Down

0 comments on commit 112ae7f

Please sign in to comment.