Skip to content

Commit

Permalink
fix: cache auto retry on MOVED / ASK / ...
Browse files Browse the repository at this point in the history
  • Loading branch information
wuyuxiang committed Dec 16, 2024
1 parent dac10aa commit 891d632
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 24 deletions.
23 changes: 23 additions & 0 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,11 @@ func (p *pipe) DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) Re
if err != nil {
if _, ok := err.(*RedisError); ok {
err = ErrDoCacheAborted
if preErr := resp.s[2].Error(); preErr != nil { // if PTTL command get a RedisError
if _, ok := preErr.(*RedisError); ok {
err = preErr
}
}
}
p.cache.Cancel(ck, cc, err)
return newErrResult(err)
Expand Down Expand Up @@ -1379,6 +1384,14 @@ func (p *pipe) doCacheMGet(ctx context.Context, cmd Cacheable, ttl time.Duration
if err != nil {
if _, ok := err.(*RedisError); ok {
err = ErrDoCacheAborted
for j := 0; j < keys+1; j++ {
if preErr := resp.s[len(multi)-2-j].Error(); preErr != nil {
if _, ok := preErr.(*RedisError); ok {
err = preErr
break
}
}
}
}
for _, key := range rewritten.Commands()[1 : keys+1] {
p.cache.Cancel(key, mgetcc, err)
Expand Down Expand Up @@ -1474,6 +1487,11 @@ func (p *pipe) DoMultiCache(ctx context.Context, multi ...CacheableTTL) *redisre
if err := resp.s[i].Error(); err != nil {
if _, ok := err.(*RedisError); ok {
err = ErrDoCacheAborted
if preErr := resp.s[i-2].Error(); preErr != nil { // if PTTL command get a RedisError
if _, ok := preErr.(*RedisError); ok {
err = preErr
}
}
}
ck, cc := cmds.CacheKey(Cacheable(missing[i-1]))
p.cache.Cancel(ck, cc, err)
Expand All @@ -1497,6 +1515,11 @@ func (p *pipe) DoMultiCache(ctx context.Context, multi ...CacheableTTL) *redisre
if err != nil {
if _, ok := err.(*RedisError); ok {
err = ErrDoCacheAborted
if preErr := resp.s[i-2].Error(); preErr != nil { // if PTTL command get a RedisError
if _, ok := preErr.(*RedisError); ok {
err = preErr
}
}
}
results.s[j] = newErrResult(err)
} else {
Expand Down
122 changes: 98 additions & 24 deletions pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1316,25 +1316,45 @@ func TestClientSideCachingExecAbort(t *testing.T) {
go func() {
mock.Expect("CLIENT", "CACHING", "YES").
Expect("MULTI").
Expect("PTTL", "a").
Expect("GET", "a").
Expect("PTTL", "a1").
Expect("GET", "a1").
Expect("EXEC").
ReplyString("OK").
ReplyString("OK").
ReplyString("OK").
ReplyString("OK").
Reply(RedisMessage{typ: '_'})
mock.Expect("CLIENT", "CACHING", "YES").
Expect("MULTI").
Expect("PTTL", "a2").
Expect("GET", "a2").
Expect("EXEC").
ReplyString("OK").
ReplyString("OK").
ReplyError("MOVED 0 127.0.0.1").
ReplyError("MOVED 0 127.0.0.1").
Reply(RedisMessage{typ: '_'})
}()

v, err := p.DoCache(context.Background(), Cacheable(cmds.NewCompleted([]string{"GET", "a"})), 10*time.Second).ToMessage()
if err != ErrDoCacheAborted {
t.Errorf("unexpected err, got %v", err)
}
if v.IsCacheHit() {
t.Errorf("unexpected cache hit")
}
if v, entry := p.cache.Flight("a", "GET", time.Second, time.Now()); v.typ != 0 || entry != nil {
t.Errorf("unexpected cache value and entry %v %v", v, entry)
for i, key := range []string{"a1", "a2"} {
v, err := p.DoCache(context.Background(), Cacheable(cmds.NewCompleted([]string{"GET", key})), 10*time.Second).ToMessage()
if i == 0 {
if err != ErrDoCacheAborted {
t.Errorf("unexpected err, got %v", err)
}
} else {
if re, ok := err.(*RedisError); !ok {
t.Errorf("unexpected err, got %v", err)
} else if _, moved := re.IsMoved(); !moved {
t.Errorf("unexpected err, got %v", err)
}
}
if v.IsCacheHit() {
t.Errorf("unexpected cache hit")
}
if v, entry := p.cache.Flight(key, "GET", time.Second, time.Now()); v.typ != 0 || entry != nil {
t.Errorf("unexpected cache value and entry %v %v", v, entry)
}
}
}

Expand Down Expand Up @@ -1641,20 +1661,54 @@ func TestClientSideCachingExecAbortMGet(t *testing.T) {
ReplyString("OK").
ReplyString("OK").
Reply(RedisMessage{typ: '_'})
mock.Expect("CLIENT", "CACHING", "YES").
Expect("MULTI").
Expect("PTTL", "b1").
Expect("PTTL", "b2").
Expect("MGET", "b1", "b2").
Expect("EXEC").
ReplyString("OK").
ReplyString("OK").
ReplyError("MOVED 0 127.0.0.1").
ReplyString("OK").
ReplyString("OK").
Reply(RedisMessage{typ: '_'})
mock.Expect("CLIENT", "CACHING", "YES").
Expect("MULTI").
Expect("PTTL", "c1").
Expect("PTTL", "c2").
Expect("MGET", "c1", "c2").
Expect("EXEC").
ReplyString("OK").
ReplyString("OK").
ReplyString("OK").
ReplyError("MOVED 0 127.0.0.1").
ReplyString("OK").
Reply(RedisMessage{typ: '_'})
}()

v, err := p.DoCache(context.Background(), Cacheable(cmds.NewMGetCompleted([]string{"MGET", "a1", "a2"})), 10*time.Second).ToMessage()
if err != ErrDoCacheAborted {
t.Errorf("unexpected err, got %v", err)
}
if v.IsCacheHit() {
t.Errorf("unexpected cache hit")
}
if v, entry := p.cache.Flight("a1", "GET", time.Second, time.Now()); v.typ != 0 || entry != nil {
t.Errorf("unexpected cache value and entry %v %v", v, entry)
}
if v, entry := p.cache.Flight("a2", "GET", time.Second, time.Now()); v.typ != 0 || entry != nil {
t.Errorf("unexpected cache value and entry %v %v", v, entry)
for i, pair := range [][2]string{{"a1", "a2"}, {"b1", "b2"}, {"c1", "c2"}} {
v, err := p.DoCache(context.Background(), Cacheable(cmds.NewMGetCompleted([]string{"MGET", pair[0], pair[1]})), 10*time.Second).ToMessage()
if i == 0 {
if err != ErrDoCacheAborted {
t.Errorf("unexpected err, got %v", err)
}
} else {
if re, ok := err.(*RedisError); !ok {
t.Errorf("unexpected err, got %v", err)
} else if _, moved := re.IsMoved(); !moved {
t.Errorf("unexpected err, got %v", err)
}
}
if v.IsCacheHit() {
t.Errorf("unexpected cache hit")
}
if v, entry := p.cache.Flight(pair[0], "GET", time.Second, time.Now()); v.typ != 0 || entry != nil {
t.Errorf("unexpected cache value and entry %v %v", v, entry)
}
if v, entry := p.cache.Flight(pair[1], "GET", time.Second, time.Now()); v.typ != 0 || entry != nil {
t.Errorf("unexpected cache value and entry %v %v", v, entry)
}
}
}

Expand Down Expand Up @@ -1925,6 +1979,11 @@ func TestClientSideCachingExecAbortDoMultiCache(t *testing.T) {
Expect("PTTL", "a2").
Expect("GET", "a2").
Expect("EXEC").
Expect("CLIENT", "CACHING", "YES").
Expect("MULTI").
Expect("PTTL", "a3").
Expect("GET", "a3").
Expect("EXEC").
ReplyString("OK").
ReplyString("OK").
ReplyString("OK").
Expand All @@ -1937,26 +1996,41 @@ func TestClientSideCachingExecAbortDoMultiCache(t *testing.T) {
ReplyString("OK").
ReplyString("OK").
ReplyString("OK").
Reply(RedisMessage{typ: '_'}).
ReplyString("OK").
ReplyString("OK").
ReplyError("MOVED 0 127.0.0.1").
Reply(RedisMessage{typ: '_'}).
Reply(RedisMessage{typ: '_'})
}()

arr := p.DoMultiCache(context.Background(), []CacheableTTL{
CT(Cacheable(cmds.NewCompleted([]string{"GET", "a1"})), time.Second*10),
CT(Cacheable(cmds.NewCompleted([]string{"GET", "a2"})), time.Second*10),
CT(Cacheable(cmds.NewCompleted([]string{"GET", "a3"})), time.Second*10),
}...).s
for i, resp := range arr {
v, err := resp.ToMessage()
if i == 0 {
if v.integer != 1 {
t.Errorf("unexpected cached response, expected %v, got %v", 1, v.integer)
}
} else {
} else if i == 1 {
if err != ErrDoCacheAborted {
t.Errorf("unexpected err, got %v", err)
}
if v.IsCacheHit() {
t.Errorf("unexpected cache hit")
}
} else if i == 2 {
if re, ok := err.(*RedisError); !ok {
t.Errorf("unexpected err, got %v", err)
} else if _, moved := re.IsMoved(); !moved {
t.Errorf("unexpected err, got %v", err)
}
if v.IsCacheHit() {
t.Errorf("unexpected cache hit")
}
}
}
if v, entry := p.cache.Flight("a1", "GET", time.Second, time.Now()); v.integer != 1 {
Expand Down

0 comments on commit 891d632

Please sign in to comment.