diff --git a/pipe.go b/pipe.go index 33ad8225..79a2fa64 100644 --- a/pipe.go +++ b/pipe.go @@ -1315,6 +1315,11 @@ func (p *pipe) DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) Re if err != nil { if _, ok := err.(*RedisError); ok { err = ErrDoCacheAborted + if preErr := resp.s[2].Error(); preErr != nil { // if PTTL command get a RedisError + if _, ok := preErr.(*RedisError); ok { + err = preErr + } + } } p.cache.Cancel(ck, cc, err) return newErrResult(err) @@ -1379,6 +1384,14 @@ func (p *pipe) doCacheMGet(ctx context.Context, cmd Cacheable, ttl time.Duration if err != nil { if _, ok := err.(*RedisError); ok { err = ErrDoCacheAborted + for j := 0; j < keys+1; j++ { + if preErr := resp.s[len(multi)-2-j].Error(); preErr != nil { + if _, ok := preErr.(*RedisError); ok { + err = preErr + break + } + } + } } for _, key := range rewritten.Commands()[1 : keys+1] { p.cache.Cancel(key, mgetcc, err) @@ -1474,6 +1487,11 @@ func (p *pipe) DoMultiCache(ctx context.Context, multi ...CacheableTTL) *redisre if err := resp.s[i].Error(); err != nil { if _, ok := err.(*RedisError); ok { err = ErrDoCacheAborted + if preErr := resp.s[i-2].Error(); preErr != nil { // if PTTL command get a RedisError + if _, ok := preErr.(*RedisError); ok { + err = preErr + } + } } ck, cc := cmds.CacheKey(Cacheable(missing[i-1])) p.cache.Cancel(ck, cc, err) @@ -1497,6 +1515,11 @@ func (p *pipe) DoMultiCache(ctx context.Context, multi ...CacheableTTL) *redisre if err != nil { if _, ok := err.(*RedisError); ok { err = ErrDoCacheAborted + if preErr := resp.s[i-2].Error(); preErr != nil { // if PTTL command get a RedisError + if _, ok := preErr.(*RedisError); ok { + err = preErr + } + } } results.s[j] = newErrResult(err) } else { diff --git a/pipe_test.go b/pipe_test.go index 7491ddb3..6ad8093b 100644 --- a/pipe_test.go +++ b/pipe_test.go @@ -1316,25 +1316,45 @@ func TestClientSideCachingExecAbort(t *testing.T) { go func() { mock.Expect("CLIENT", "CACHING", "YES"). Expect("MULTI"). - Expect("PTTL", "a"). - Expect("GET", "a"). + Expect("PTTL", "a1"). + Expect("GET", "a1"). Expect("EXEC"). ReplyString("OK"). ReplyString("OK"). ReplyString("OK"). ReplyString("OK"). Reply(RedisMessage{typ: '_'}) + mock.Expect("CLIENT", "CACHING", "YES"). + Expect("MULTI"). + Expect("PTTL", "a2"). + Expect("GET", "a2"). + Expect("EXEC"). + ReplyString("OK"). + ReplyString("OK"). + ReplyError("MOVED 0 127.0.0.1"). + ReplyError("MOVED 0 127.0.0.1"). + Reply(RedisMessage{typ: '_'}) }() - v, err := p.DoCache(context.Background(), Cacheable(cmds.NewCompleted([]string{"GET", "a"})), 10*time.Second).ToMessage() - if err != ErrDoCacheAborted { - t.Errorf("unexpected err, got %v", err) - } - if v.IsCacheHit() { - t.Errorf("unexpected cache hit") - } - if v, entry := p.cache.Flight("a", "GET", time.Second, time.Now()); v.typ != 0 || entry != nil { - t.Errorf("unexpected cache value and entry %v %v", v, entry) + for i, key := range []string{"a1", "a2"} { + v, err := p.DoCache(context.Background(), Cacheable(cmds.NewCompleted([]string{"GET", key})), 10*time.Second).ToMessage() + if i == 0 { + if err != ErrDoCacheAborted { + t.Errorf("unexpected err, got %v", err) + } + } else { + if re, ok := err.(*RedisError); !ok { + t.Errorf("unexpected err, got %v", err) + } else if _, moved := re.IsMoved(); !moved { + t.Errorf("unexpected err, got %v", err) + } + } + if v.IsCacheHit() { + t.Errorf("unexpected cache hit") + } + if v, entry := p.cache.Flight(key, "GET", time.Second, time.Now()); v.typ != 0 || entry != nil { + t.Errorf("unexpected cache value and entry %v %v", v, entry) + } } } @@ -1641,20 +1661,54 @@ func TestClientSideCachingExecAbortMGet(t *testing.T) { ReplyString("OK"). ReplyString("OK"). Reply(RedisMessage{typ: '_'}) + mock.Expect("CLIENT", "CACHING", "YES"). + Expect("MULTI"). + Expect("PTTL", "b1"). + Expect("PTTL", "b2"). + Expect("MGET", "b1", "b2"). + Expect("EXEC"). + ReplyString("OK"). + ReplyString("OK"). + ReplyError("MOVED 0 127.0.0.1"). + ReplyString("OK"). + ReplyString("OK"). + Reply(RedisMessage{typ: '_'}) + mock.Expect("CLIENT", "CACHING", "YES"). + Expect("MULTI"). + Expect("PTTL", "c1"). + Expect("PTTL", "c2"). + Expect("MGET", "c1", "c2"). + Expect("EXEC"). + ReplyString("OK"). + ReplyString("OK"). + ReplyString("OK"). + ReplyError("MOVED 0 127.0.0.1"). + ReplyString("OK"). + Reply(RedisMessage{typ: '_'}) }() - v, err := p.DoCache(context.Background(), Cacheable(cmds.NewMGetCompleted([]string{"MGET", "a1", "a2"})), 10*time.Second).ToMessage() - if err != ErrDoCacheAborted { - t.Errorf("unexpected err, got %v", err) - } - if v.IsCacheHit() { - t.Errorf("unexpected cache hit") - } - if v, entry := p.cache.Flight("a1", "GET", time.Second, time.Now()); v.typ != 0 || entry != nil { - t.Errorf("unexpected cache value and entry %v %v", v, entry) - } - if v, entry := p.cache.Flight("a2", "GET", time.Second, time.Now()); v.typ != 0 || entry != nil { - t.Errorf("unexpected cache value and entry %v %v", v, entry) + for i, pair := range [][2]string{{"a1", "a2"}, {"b1", "b2"}, {"c1", "c2"}} { + v, err := p.DoCache(context.Background(), Cacheable(cmds.NewMGetCompleted([]string{"MGET", pair[0], pair[1]})), 10*time.Second).ToMessage() + if i == 0 { + if err != ErrDoCacheAborted { + t.Errorf("unexpected err, got %v", err) + } + } else { + if re, ok := err.(*RedisError); !ok { + t.Errorf("unexpected err, got %v", err) + } else if _, moved := re.IsMoved(); !moved { + t.Errorf("unexpected err, got %v", err) + } + } + if v.IsCacheHit() { + t.Errorf("unexpected cache hit") + } + if v, entry := p.cache.Flight(pair[0], "GET", time.Second, time.Now()); v.typ != 0 || entry != nil { + t.Errorf("unexpected cache value and entry %v %v", v, entry) + } + if v, entry := p.cache.Flight(pair[1], "GET", time.Second, time.Now()); v.typ != 0 || entry != nil { + t.Errorf("unexpected cache value and entry %v %v", v, entry) + } } } @@ -1925,6 +1979,11 @@ func TestClientSideCachingExecAbortDoMultiCache(t *testing.T) { Expect("PTTL", "a2"). Expect("GET", "a2"). Expect("EXEC"). + Expect("CLIENT", "CACHING", "YES"). + Expect("MULTI"). + Expect("PTTL", "a3"). + Expect("GET", "a3"). + Expect("EXEC"). ReplyString("OK"). ReplyString("OK"). ReplyString("OK"). @@ -1937,12 +1996,18 @@ func TestClientSideCachingExecAbortDoMultiCache(t *testing.T) { ReplyString("OK"). ReplyString("OK"). ReplyString("OK"). + Reply(RedisMessage{typ: '_'}). + ReplyString("OK"). + ReplyString("OK"). + ReplyError("MOVED 0 127.0.0.1"). + Reply(RedisMessage{typ: '_'}). Reply(RedisMessage{typ: '_'}) }() arr := p.DoMultiCache(context.Background(), []CacheableTTL{ CT(Cacheable(cmds.NewCompleted([]string{"GET", "a1"})), time.Second*10), CT(Cacheable(cmds.NewCompleted([]string{"GET", "a2"})), time.Second*10), + CT(Cacheable(cmds.NewCompleted([]string{"GET", "a3"})), time.Second*10), }...).s for i, resp := range arr { v, err := resp.ToMessage() @@ -1950,13 +2015,22 @@ func TestClientSideCachingExecAbortDoMultiCache(t *testing.T) { if v.integer != 1 { t.Errorf("unexpected cached response, expected %v, got %v", 1, v.integer) } - } else { + } else if i == 1 { if err != ErrDoCacheAborted { t.Errorf("unexpected err, got %v", err) } if v.IsCacheHit() { t.Errorf("unexpected cache hit") } + } else if i == 2 { + if re, ok := err.(*RedisError); !ok { + t.Errorf("unexpected err, got %v", err) + } else if _, moved := re.IsMoved(); !moved { + t.Errorf("unexpected err, got %v", err) + } + if v.IsCacheHit() { + t.Errorf("unexpected cache hit") + } } } if v, entry := p.cache.Flight("a1", "GET", time.Second, time.Now()); v.integer != 1 {