diff --git a/cluster.go b/cluster.go index e5a8bb2d..42b61036 100644 --- a/cluster.go +++ b/cluster.go @@ -19,56 +19,6 @@ import ( var ErrNoSlot = errors.New("the slot has no redis node") var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplicas option") -type retry struct { - cIndexes []int - commands []Completed - aIndexes []int - cAskings []Completed -} - -func (r *retry) Capacity() int { - return cap(r.commands) -} - -func (r *retry) ResetLen(n int) { - r.cIndexes = r.cIndexes[:n] - r.commands = r.commands[:n] - r.aIndexes = r.aIndexes[:0] - r.cAskings = r.cAskings[:0] -} - -var retryp = util.NewPool(func(capacity int) *retry { - return &retry{ - cIndexes: make([]int, 0, capacity), - commands: make([]Completed, 0, capacity), - } -}) - -type retrycache struct { - cIndexes []int - commands []CacheableTTL - aIndexes []int - cAskings []CacheableTTL -} - -func (r *retrycache) Capacity() int { - return cap(r.commands) -} - -func (r *retrycache) ResetLen(n int) { - r.cIndexes = r.cIndexes[:n] - r.commands = r.commands[:n] - r.aIndexes = r.aIndexes[:0] - r.cAskings = r.cAskings[:0] -} - -var retrycachep = util.NewPool(func(capacity int) *retrycache { - return &retrycache{ - cIndexes: make([]int, 0, capacity), - commands: make([]CacheableTTL, 0, capacity), - } -}) - type clusterClient struct { pslots [16384]conn rslots []conn @@ -1285,60 +1235,3 @@ const ( panicMsgCxSlot = "cross slot command in Dedicated is prohibited" panicMixCxSlot = "Mixing no-slot and cross slot commands in DoMulti is prohibited" ) - -type conncount struct { - m map[conn]int - n int -} - -func (r *conncount) Capacity() int { - return r.n -} - -func (r *conncount) ResetLen(n int) { - for k := range r.m { - delete(r.m, k) - } -} - -var conncountp = util.NewPool(func(capacity int) *conncount { - return &conncount{m: make(map[conn]int, capacity), n: capacity} -}) - -type connretry struct { - m map[conn]*retry - n int -} - -func (r *connretry) Capacity() int { - return r.n -} - -func (r *connretry) ResetLen(n int) { - for k := range r.m { - delete(r.m, k) - } -} - -var connretryp = util.NewPool(func(capacity int) *connretry { - return &connretry{m: make(map[conn]*retry, capacity), n: capacity} -}) - -type connretrycache struct { - m map[conn]*retrycache - n int -} - -func (r *connretrycache) Capacity() int { - return r.n -} - -func (r *connretrycache) ResetLen(n int) { - for k := range r.m { - delete(r.m, k) - } -} - -var connretrycachep = util.NewPool(func(capacity int) *connretrycache { - return &connretrycache{m: make(map[conn]*retrycache, capacity), n: capacity} -}) diff --git a/helper.go b/helper.go index 70339134..3d4c510d 100644 --- a/helper.go +++ b/helper.go @@ -6,41 +6,8 @@ import ( "time" intl "github.com/redis/rueidis/internal/cmds" - "github.com/redis/rueidis/internal/util" ) -type mgetcachecmds struct { - s []CacheableTTL -} - -func (r *mgetcachecmds) Capacity() int { - return cap(r.s) -} - -func (r *mgetcachecmds) ResetLen(n int) { - r.s = r.s[:n] -} - -var mgetcachecmdsp = util.NewPool(func(capacity int) *mgetcachecmds { - return &mgetcachecmds{s: make([]CacheableTTL, 0, capacity)} -}) - -type mgetcmds struct { - s []Completed -} - -func (r *mgetcmds) Capacity() int { - return cap(r.s) -} - -func (r *mgetcmds) ResetLen(n int) { - r.s = r.s[:n] -} - -var mgetcmdsp = util.NewPool(func(capacity int) *mgetcmds { - return &mgetcmds{s: make([]Completed, 0, capacity)} -}) - // MGetCache is a helper that consults the client-side caches with multiple keys by grouping keys within same slot into multiple GETs func MGetCache(client Client, ctx context.Context, ttl time.Duration, keys []string) (ret map[string]RedisMessage, err error) { if len(keys) == 0 { diff --git a/internal/cmds/builder_put.go b/internal/cmds/builder_put.go index fded6a32..d191724f 100644 --- a/internal/cmds/builder_put.go +++ b/internal/cmds/builder_put.go @@ -1,11 +1,7 @@ -//go:build !go1.21 - package cmds func Put(cs *CommandSlice) { - for i := range cs.s { - cs.s[i] = "" - } + clear(cs.s) cs.s = cs.s[:0] cs.l = -1 cs.r = 0 diff --git a/internal/cmds/builder_put_121.go b/internal/cmds/builder_put_121.go deleted file mode 100644 index 45615ed1..00000000 --- a/internal/cmds/builder_put_121.go +++ /dev/null @@ -1,11 +0,0 @@ -//go:build go1.21 - -package cmds - -func Put(cs *CommandSlice) { - clear(cs.s) - cs.s = cs.s[:0] - cs.l = -1 - cs.r = 0 - pool.Put(cs) -} diff --git a/internal/util/pool.go b/internal/util/pool.go index fd251fb2..fe676ff0 100644 --- a/internal/util/pool.go +++ b/internal/util/pool.go @@ -2,7 +2,6 @@ package util import ( "sync" - "sync/atomic" ) type Container interface { @@ -11,23 +10,19 @@ type Container interface { } func NewPool[T Container](fn func(capacity int) T) *Pool[T] { - p := &Pool[T]{fn: fn} - p.sp.New = func() any { - return fn(int(atomic.LoadUint32(&p.ca))) - } - return p + return &Pool[T]{fn: fn} } type Pool[T Container] struct { sp sync.Pool fn func(capacity int) T - ca uint32 } func (p *Pool[T]) Get(length, capacity int) T { - atomic.StoreUint32(&p.ca, uint32(capacity)) - s := p.sp.Get().(T) - if s.Capacity() < capacity { + s, ok := p.sp.Get().(T) + if !ok { + s = p.fn(capacity) + } else if s.Capacity() < capacity { p.sp.Put(s) s = p.fn(capacity) } @@ -36,5 +31,6 @@ func (p *Pool[T]) Get(length, capacity int) T { } func (p *Pool[T]) Put(s T) { + s.ResetLen(0) p.sp.Put(s) } diff --git a/mux.go b/mux.go index da2c6fdd..001cd50f 100644 --- a/mux.go +++ b/mux.go @@ -22,27 +22,6 @@ type singleconnect struct { g sync.WaitGroup } -type batchcache struct { - cIndexes []int - commands []CacheableTTL -} - -func (r *batchcache) Capacity() int { - return cap(r.commands) -} - -func (r *batchcache) ResetLen(n int) { - r.cIndexes = r.cIndexes[:n] - r.commands = r.commands[:n] -} - -var batchcachep = util.NewPool(func(capacity int) *batchcache { - return &batchcache{ - cIndexes: make([]int, 0, capacity), - commands: make([]CacheableTTL, 0, capacity), - } -}) - type conn interface { Do(ctx context.Context, cmd Completed) RedisResult DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) RedisResult @@ -399,53 +378,3 @@ func slotfn(n int, ks uint16, noreply bool) uint16 { } return uint16(util.FastRand(n)) } - -type muxslots struct { - s []int -} - -func (r *muxslots) Capacity() int { - return cap(r.s) -} - -func (r *muxslots) ResetLen(n int) { - r.s = r.s[:n] - for i := 0; i < n; i++ { - r.s[i] = 0 - } -} - -func (r *muxslots) LessThen(n int) bool { - count := 0 - for _, value := range r.s { - if value > 0 { - if count++; count == n { - return false - } - } - } - return true -} - -var muxslotsp = util.NewPool(func(capacity int) *muxslots { - return &muxslots{s: make([]int, 0, capacity)} -}) - -type batchcachemap struct { - m map[uint16]*batchcache - n int -} - -func (r *batchcachemap) Capacity() int { - return r.n -} - -func (r *batchcachemap) ResetLen(n int) { - for k := range r.m { - delete(r.m, k) - } -} - -var batchcachemaps = util.NewPool(func(capacity int) *batchcachemap { - return &batchcachemap{m: make(map[uint16]*batchcache, capacity), n: capacity} -}) diff --git a/pipe.go b/pipe.go index 29bae329..149cd808 100644 --- a/pipe.go +++ b/pipe.go @@ -17,7 +17,6 @@ import ( "time" "github.com/redis/rueidis/internal/cmds" - "github.com/redis/rueidis/internal/util" ) const LibName = "rueidis" @@ -43,44 +42,6 @@ type wire interface { SetOnCloseHook(fn func(error)) } -type redisresults struct { - s []RedisResult -} - -func (r *redisresults) Capacity() int { - return cap(r.s) -} - -func (r *redisresults) ResetLen(n int) { - r.s = r.s[:n] - for i := 0; i < n; i++ { - r.s[i] = RedisResult{} - } -} - -var resultsp = util.NewPool(func(capacity int) *redisresults { - return &redisresults{s: make([]RedisResult, 0, capacity)} -}) - -type cacheentries struct { - e map[int]CacheEntry - c int -} - -func (c *cacheentries) Capacity() int { - return c.c -} - -func (c *cacheentries) ResetLen(n int) { - for k := range c.e { - delete(c.e, k) - } -} - -var entriesp = util.NewPool(func(capacity int) *cacheentries { - return &cacheentries{e: make(map[int]CacheEntry, capacity), c: capacity} -}) - var _ wire = (*pipe)(nil) type pipe struct { diff --git a/resp.go b/resp.go index 311f68b5..7671c5e6 100644 --- a/resp.go +++ b/resp.go @@ -329,6 +329,7 @@ next: lr.R = i lr.N = n n, err = io.Copy(w, lr) + lr.R = nil lrs.Put(lr) } else if typ == typeChunk { return n, err, true diff --git a/syncp.go b/syncp.go new file mode 100644 index 00000000..6ddad6ad --- /dev/null +++ b/syncp.go @@ -0,0 +1,241 @@ +package rueidis + +import "github.com/redis/rueidis/internal/util" + +var ( + resultsp = util.NewPool(func(capacity int) *redisresults { + return &redisresults{s: make([]RedisResult, 0, capacity)} + }) + mgetcmdsp = util.NewPool(func(capacity int) *mgetcmds { + return &mgetcmds{s: make([]Completed, 0, capacity)} + }) + retryp = util.NewPool(func(capacity int) *retry { + return &retry{ + cIndexes: make([]int, 0, capacity), + commands: make([]Completed, 0, capacity), + } + }) + mgetcachecmdsp = util.NewPool(func(capacity int) *mgetcachecmds { + return &mgetcachecmds{s: make([]CacheableTTL, 0, capacity)} + }) + retrycachep = util.NewPool(func(capacity int) *retrycache { + return &retrycache{ + cIndexes: make([]int, 0, capacity), + commands: make([]CacheableTTL, 0, capacity), + } + }) + batchcachep = util.NewPool(func(capacity int) *batchcache { + return &batchcache{ + cIndexes: make([]int, 0, capacity), + commands: make([]CacheableTTL, 0, capacity), + } + }) + batchcachemaps = util.NewPool(func(capacity int) *batchcachemap { + return &batchcachemap{m: make(map[uint16]*batchcache, capacity), n: capacity} + }) + muxslotsp = util.NewPool(func(capacity int) *muxslots { + return &muxslots{s: make([]int, 0, capacity)} + }) + connretryp = util.NewPool(func(capacity int) *connretry { + return &connretry{m: make(map[conn]*retry, capacity), n: capacity} + }) + conncountp = util.NewPool(func(capacity int) *conncount { + return &conncount{m: make(map[conn]int, capacity), n: capacity} + }) + connretrycachep = util.NewPool(func(capacity int) *connretrycache { + return &connretrycache{m: make(map[conn]*retrycache, capacity), n: capacity} + }) +) + +type muxslots struct { + s []int +} + +func (r *muxslots) Capacity() int { + return cap(r.s) +} + +func (r *muxslots) ResetLen(n int) { + clear(r.s) + r.s = r.s[:n] +} + +func (r *muxslots) LessThen(n int) bool { + count := 0 + for _, value := range r.s { + if value > 0 { + if count++; count == n { + return false + } + } + } + return true +} + +type redisresults struct { + s []RedisResult +} + +func (r *redisresults) Capacity() int { + return cap(r.s) +} + +func (r *redisresults) ResetLen(n int) { + clear(r.s) + r.s = r.s[:n] +} + +type cacheentries struct { + e map[int]CacheEntry + c int +} + +func (c *cacheentries) Capacity() int { + return c.c +} + +func (c *cacheentries) ResetLen(n int) { + clear(c.e) +} + +var entriesp = util.NewPool(func(capacity int) *cacheentries { + return &cacheentries{e: make(map[int]CacheEntry, capacity), c: capacity} +}) + +type mgetcachecmds struct { + s []CacheableTTL +} + +func (r *mgetcachecmds) Capacity() int { + return cap(r.s) +} + +func (r *mgetcachecmds) ResetLen(n int) { + clear(r.s) + r.s = r.s[:n] +} + +type mgetcmds struct { + s []Completed +} + +func (r *mgetcmds) Capacity() int { + return cap(r.s) +} + +func (r *mgetcmds) ResetLen(n int) { + clear(r.s) + r.s = r.s[:n] +} + +type retry struct { + cIndexes []int + commands []Completed + aIndexes []int + cAskings []Completed +} + +func (r *retry) Capacity() int { + return cap(r.commands) +} + +func (r *retry) ResetLen(n int) { + clear(r.cIndexes) + clear(r.commands) + clear(r.aIndexes) + clear(r.cAskings) + r.cIndexes = r.cIndexes[:n] + r.commands = r.commands[:n] + r.aIndexes = r.aIndexes[:0] + r.cAskings = r.cAskings[:0] +} + +type retrycache struct { + cIndexes []int + commands []CacheableTTL + aIndexes []int + cAskings []CacheableTTL +} + +func (r *retrycache) Capacity() int { + return cap(r.commands) +} + +func (r *retrycache) ResetLen(n int) { + clear(r.cIndexes) + clear(r.commands) + clear(r.aIndexes) + clear(r.cAskings) + r.cIndexes = r.cIndexes[:n] + r.commands = r.commands[:n] + r.aIndexes = r.aIndexes[:0] + r.cAskings = r.cAskings[:0] +} + +type batchcache struct { + cIndexes []int + commands []CacheableTTL +} + +func (r *batchcache) Capacity() int { + return cap(r.commands) +} + +func (r *batchcache) ResetLen(n int) { + clear(r.cIndexes) + clear(r.commands) + r.cIndexes = r.cIndexes[:n] + r.commands = r.commands[:n] +} + +type batchcachemap struct { + m map[uint16]*batchcache + n int +} + +func (r *batchcachemap) Capacity() int { + return r.n +} + +func (r *batchcachemap) ResetLen(n int) { + clear(r.m) +} + +type conncount struct { + m map[conn]int + n int +} + +func (r *conncount) Capacity() int { + return r.n +} + +func (r *conncount) ResetLen(n int) { + clear(r.m) +} + +type connretry struct { + m map[conn]*retry + n int +} + +func (r *connretry) Capacity() int { + return r.n +} + +func (r *connretry) ResetLen(n int) { + clear(r.m) +} + +type connretrycache struct { + m map[conn]*retrycache + n int +} + +func (r *connretrycache) Capacity() int { + return r.n +} + +func (r *connretrycache) ResetLen(n int) { + clear(r.m) +}