Skip to content

Commit

Permalink
feat: add DisableAutoPipelining to serve requests from the connection…
Browse files Browse the repository at this point in the history
… pool (#646)

Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian authored Oct 13, 2024
1 parent 8a61462 commit b93778c
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 22 deletions.
23 changes: 16 additions & 7 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type mux struct {
mu []sync.Mutex
maxp int
maxm int

usePool bool
}

func makeMux(dst string, option *ClientOption, dialFn dialFn) *mux {
Expand Down Expand Up @@ -90,6 +92,8 @@ func newMux(dst string, option *ClientOption, init, dead wire, wireFn wireFn, wi
sc: make([]*singleconnect, multiplex),
maxp: runtime.GOMAXPROCS(0),
maxm: option.BlockingPipeline,

usePool: option.DisableAutoPipelining,
}
m.clhks.Store(emptyclhks)
for i := 0; i < len(m.wire); i++ {
Expand Down Expand Up @@ -200,7 +204,7 @@ func (m *mux) DoMultiStream(ctx context.Context, multi ...Completed) MultiRedisR
}

func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
if cmd.IsBlock() {
if (m.usePool && !cmd.NoReply()) || cmd.IsBlock() {
resp = m.blocking(ctx, cmd)
} else {
resp = m.pipeline(ctx, cmd)
Expand All @@ -209,40 +213,45 @@ func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
}

func (m *mux) DoMulti(ctx context.Context, multi ...Completed) (resp *redisresults) {
if len(multi) >= m.maxm && m.maxm > 0 {
if m.usePool || (len(multi) >= m.maxm && m.maxm > 0) {
goto block // use a dedicated connection if the pipeline is too large
}
for _, cmd := range multi {
if cmd.IsBlock() {
cmds.ToBlock(&multi[0]) // mark the first cmd as block if one of them is block to shortcut later check.
goto block
}
}
return m.pipelineMulti(ctx, multi)
block:
cmds.ToBlock(&multi[0]) // mark the first cmd as block if one of them is block to shortcut later check.
for _, cmd := range multi {
if cmd.NoReply() {
return m.pipelineMulti(ctx, multi)
}
}
return m.blockingMulti(ctx, multi)
}

func (m *mux) blocking(ctx context.Context, cmd Completed) (resp RedisResult) {
wire := m.dpool.Acquire()
wire := m.spool.Acquire()
resp = wire.Do(ctx, cmd)
if resp.NonRedisError() != nil { // abort the wire if blocking command return early (ex. context.DeadlineExceeded)
wire.Close()
}
m.dpool.Store(wire)
m.spool.Store(wire)
return resp
}

func (m *mux) blockingMulti(ctx context.Context, cmd []Completed) (resp *redisresults) {
wire := m.dpool.Acquire()
wire := m.spool.Acquire()
resp = wire.DoMulti(ctx, cmd...)
for _, res := range resp.s {
if res.NonRedisError() != nil { // abort the wire if blocking command return early (ex. context.DeadlineExceeded)
wire.Close()
break
}
}
m.dpool.Store(wire)
m.spool.Store(wire)
return resp
}

Expand Down
4 changes: 2 additions & 2 deletions mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func TestMuxReuseWire(t *testing.T) {
t.Fatalf("unexpected dial error %v", err)
}

wire1 := m.Acquire()
wire1 := m.spool.Acquire()

go func() {
// this should use the second wire
Expand All @@ -215,7 +215,7 @@ func TestMuxReuseWire(t *testing.T) {
}()
<-blocking

m.Store(wire1)
m.spool.Store(wire1)
// this should use the first wire
if val, err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
Expand Down
35 changes: 22 additions & 13 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,22 +91,23 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps, nobg
return nil, err
}
p = &pipe{
conn: conn,
queue: newRing(option.RingScaleEachConn),
r: bufio.NewReaderSize(conn, option.ReadBufferEachConn),
w: bufio.NewWriterSize(conn, option.WriteBufferEachConn),

nsubs: newSubs(),
psubs: newSubs(),
ssubs: newSubs(),
close: make(chan struct{}),
conn: conn,
r: bufio.NewReaderSize(conn, option.ReadBufferEachConn),
w: bufio.NewWriterSize(conn, option.WriteBufferEachConn),

timeout: option.ConnWriteTimeout,
pinggap: option.Dialer.KeepAlive,
maxFlushDelay: option.MaxFlushDelay,

r2ps: r2ps,
}
if !nobg {
p.queue = newRing(option.RingScaleEachConn)
p.nsubs = newSubs()
p.psubs = newSubs()
p.ssubs = newSubs()
p.close = make(chan struct{})
}
if !r2ps {
p.r2psFn = func() (p *pipe, err error) {
return _newPipe(connFn, option, true, nobg)
Expand Down Expand Up @@ -305,8 +306,10 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps, nobg
}

func (p *pipe) background() {
atomic.CompareAndSwapInt32(&p.state, 0, 1)
p.once.Do(func() { go p._background() })
if p.queue != nil {
atomic.CompareAndSwapInt32(&p.state, 0, 1)
p.once.Do(func() { go p._background() })
}
}

func (p *pipe) _exit(err error) {
Expand Down Expand Up @@ -825,7 +828,7 @@ func (p *pipe) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
goto queue
}
dl, ok := ctx.Deadline()
if !ok && ctx.Done() != nil {
if p.queue != nil && !ok && ctx.Done() != nil {
p.background()
goto queue
}
Expand Down Expand Up @@ -896,6 +899,12 @@ func (p *pipe) DoMulti(ctx context.Context, multi ...Completed) *redisresults {

for _, cmd := range multi {
if cmd.IsBlock() {
if noReply != 0 {
for i := 0; i < len(resp.s); i++ {
resp.s[i] = newErrResult(ErrBlockingPubSubMixed)
}
return resp
}
atomic.AddInt32(&p.blcksig, 1)
defer func() {
for _, r := range resp.s {
Expand Down Expand Up @@ -925,7 +934,7 @@ func (p *pipe) DoMulti(ctx context.Context, multi ...Completed) *redisresults {
goto queue
}
dl, ok := ctx.Deadline()
if !ok && ctx.Done() != nil {
if p.queue != nil && !ok && ctx.Done() != nil {
p.background()
goto queue
}
Expand Down
16 changes: 16 additions & 0 deletions pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3033,6 +3033,22 @@ func TestPubSub(t *testing.T) {
}
})

t.Run("PubSub blocking mixed", func(t *testing.T) {
p, _, cancel, _ := setup(t, ClientOption{})
defer cancel()

commands := []Completed{
builder.Subscribe().Channel("a").Build(),
builder.Psubscribe().Pattern("b").Build(),
builder.Blpop().Key("c").Timeout(0).Build(),
}
for _, resp := range p.DoMulti(context.Background(), commands...).s {
if e := resp.Error(); e != ErrBlockingPubSubMixed {
t.Fatalf("unexpected err %v", e)
}
}
})

t.Run("RESP2 pubsub mixed", func(t *testing.T) {
p, _, cancel, _ := setup(t, ClientOption{})
p.version = 5
Expand Down
28 changes: 28 additions & 0 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"math/rand"
"net"
"os"
"reflect"
"strconv"
"sync"
Expand Down Expand Up @@ -805,10 +806,13 @@ func TestSingleClientIntegration(t *testing.T) {
t.Skip()
}
defer ShouldNotLeaked(SetupLeakDetection())

client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:6379"},
ConnWriteTimeout: 180 * time.Second,
PipelineMultiplex: 1,

DisableAutoPipelining: os.Getenv("DisableAutoPipelining") == "true",
})
if err != nil {
t.Fatal(err)
Expand All @@ -820,11 +824,18 @@ func TestSingleClientIntegration(t *testing.T) {
client.Close()
}

func TestSingleClientIntegrationWithPool(t *testing.T) {
os.Setenv("DisableAutoPipelining", "true")
defer os.Unsetenv("DisableAutoPipelining")
TestSingleClientIntegration(t)
}

func TestSentinelClientIntegration(t *testing.T) {
if testing.Short() {
t.Skip()
}
defer ShouldNotLeaked(SetupLeakDetection())

client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:26379"},
ConnWriteTimeout: 180 * time.Second,
Expand All @@ -833,6 +844,8 @@ func TestSentinelClientIntegration(t *testing.T) {
},
SelectDB: 2, // https://github.com/redis/rueidis/issues/138
PipelineMultiplex: 1,

DisableAutoPipelining: os.Getenv("DisableAutoPipelining") == "true",
})
if err != nil {
t.Fatal(err)
Expand All @@ -844,17 +857,26 @@ func TestSentinelClientIntegration(t *testing.T) {
client.Close()
}

func TestSentinelClientIntegrationWithPool(t *testing.T) {
os.Setenv("DisableAutoPipelining", "true")
defer os.Unsetenv("DisableAutoPipelining")
TestSentinelClientIntegration(t)
}

func TestClusterClientIntegration(t *testing.T) {
if testing.Short() {
t.Skip()
}
defer ShouldNotLeaked(SetupLeakDetection())

client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
ConnWriteTimeout: 180 * time.Second,
ShuffleInit: true,
Dialer: net.Dialer{KeepAlive: -1},
PipelineMultiplex: 1,

DisableAutoPipelining: os.Getenv("DisableAutoPipelining") == "true",
})
if err != nil {
t.Fatal(err)
Expand All @@ -864,6 +886,12 @@ func TestClusterClientIntegration(t *testing.T) {
client.Close()
}

func TestClusterClientIntegrationWithPool(t *testing.T) {
os.Setenv("DisableAutoPipelining", "true")
defer os.Unsetenv("DisableAutoPipelining")
TestClusterClientIntegration(t)
}

func TestSingleClient5Integration(t *testing.T) {
if testing.Short() {
t.Skip()
Expand Down
7 changes: 7 additions & 0 deletions rueidis.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ var (
ErrNoCache = errors.New("ClientOption.DisableCache must be true for redis not supporting client-side caching or not supporting RESP3")
// ErrRESP2PubSubMixed means your redis does not support RESP3 and rueidis can't handle SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE in mixed case
ErrRESP2PubSubMixed = errors.New("rueidis does not support SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE mixed with other commands in RESP2")
// ErrBlockingPubSubMixed rueidis can't handle SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE mixed with other blocking commands
ErrBlockingPubSubMixed = errors.New("rueidis does not support SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE mixed with other blocking commands")
// ErrDoCacheAborted means redis abort EXEC request or connection closed
ErrDoCacheAborted = errors.New("failed to fetch the cache because EXEC was aborted by redis or connection closed")
// ErrReplicaOnlyNotSupported means ReplicaOnly flag is not supported by
Expand Down Expand Up @@ -171,6 +173,8 @@ type ClientOption struct {
RetryDelay RetryDelayFn
// DisableCache falls back Client.DoCache/Client.DoMultiCache to Client.Do/Client.DoMulti
DisableCache bool
// DisableAutoPipelining makes rueidis.Client always pick a connection from the BlockingPool to serve each request.
DisableAutoPipelining bool
// AlwaysPipelining makes rueidis.Client always pipeline redis commands even if they are not issued concurrently.
AlwaysPipelining bool
// AlwaysRESP2 makes rueidis.Client always uses RESP2, otherwise it will try using RESP3 first.
Expand Down Expand Up @@ -358,6 +362,9 @@ func NewClient(option ClientOption) (client Client, err error) {
if option.BlockingPipeline == 0 {
option.BlockingPipeline = DefaultBlockingPipeline
}
if option.DisableAutoPipelining {
option.AlwaysPipelining = false
}
if option.ShuffleInit {
util.Shuffle(len(option.InitAddress), func(i, j int) {
option.InitAddress[i], option.InitAddress[j] = option.InitAddress[j], option.InitAddress[i]
Expand Down

0 comments on commit b93778c

Please sign in to comment.