From 8a61462cec77438e611e514bb52babca1c366afb Mon Sep 17 00:00:00 2001 From: Rueian Date: Sun, 13 Oct 2024 11:40:56 -0700 Subject: [PATCH] feat: extend RetryDelayFn to take the command to be retried Signed-off-by: Rueian --- client.go | 20 ++++++++++---------- client_test.go | 46 +++++++++++++++++++++++----------------------- cluster.go | 20 ++++++++++---------- retry.go | 23 ++++++++++++----------- retry_test.go | 38 +++++++++++++++++++------------------- sentinel.go | 14 +++++++------- 6 files changed, 81 insertions(+), 80 deletions(-) diff --git a/client.go b/client.go index 27aa2517..b97b4e9b 100644 --- a/client.go +++ b/client.go @@ -49,7 +49,7 @@ retry: resp = c.conn.Do(ctx, cmd) if c.retry && cmd.IsReadOnly() && c.isRetryable(resp.NonRedisError(), ctx) { shouldRetry := c.retryHandler.WaitOrSkipRetry( - ctx, attempts, resp.Error(), + ctx, attempts, cmd, resp.Error(), ) if shouldRetry { attempts++ @@ -87,10 +87,10 @@ func (c *singleClient) DoMulti(ctx context.Context, multi ...Completed) (resps [ retry: resps = c.conn.DoMulti(ctx, multi...).s if c.retry && allReadOnly(multi) { - for _, resp := range resps { + for i, resp := range resps { if c.isRetryable(resp.NonRedisError(), ctx) { shouldRetry := c.retryHandler.WaitOrSkipRetry( - ctx, attempts, resp.Error(), + ctx, attempts, multi[i], resp.Error(), ) if shouldRetry { attempts++ @@ -115,10 +115,10 @@ func (c *singleClient) DoMultiCache(ctx context.Context, multi ...CacheableTTL) retry: resps = c.conn.DoMultiCache(ctx, multi...).s if c.retry { - for _, resp := range resps { + for i, resp := range resps { if c.isRetryable(resp.NonRedisError(), ctx) { shouldRetry := c.retryHandler.WaitOrSkipRetry( - ctx, attempts, resp.Error(), + ctx, attempts, Completed(multi[i].Cmd), resp.Error(), ) if shouldRetry { attempts++ @@ -140,7 +140,7 @@ func (c *singleClient) DoCache(ctx context.Context, cmd Cacheable, ttl time.Dura retry: resp = c.conn.DoCache(ctx, cmd, ttl) if c.retry && c.isRetryable(resp.NonRedisError(), ctx) { - shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, resp.Error()) + shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, Completed(cmd), resp.Error()) if shouldRetry { attempts++ goto retry @@ -158,7 +158,7 @@ retry: err = c.conn.Receive(ctx, subscribe, fn) if c.retry { if _, ok := err.(*RedisError); !ok && c.isRetryable(err, ctx) { - shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, err) + shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, subscribe, err) if shouldRetry { attempts++ goto retry @@ -217,7 +217,7 @@ retry: resp = c.wire.Do(ctx, cmd) if c.retry && cmd.IsReadOnly() && isRetryable(resp.NonRedisError(), c.wire, ctx) { shouldRetry := c.retryHandler.WaitOrSkipRetry( - ctx, attempts, resp.Error(), + ctx, attempts, cmd, resp.Error(), ) if shouldRetry { attempts++ @@ -247,7 +247,7 @@ retry: for i, cmd := range multi { if retryable && isRetryable(resp[i].NonRedisError(), c.wire, ctx) { shouldRetry := c.retryHandler.WaitOrSkipRetry( - ctx, attempts, resp[i].Error(), + ctx, attempts, multi[i], resp[i].Error(), ) if shouldRetry { attempts++ @@ -271,7 +271,7 @@ retry: if c.retry { if _, ok := err.(*RedisError); !ok && isRetryable(err, c.wire, ctx) { shouldRetry := c.retryHandler.WaitOrSkipRetry( - ctx, attempts, err, + ctx, attempts, subscribe, err, ) if shouldRetry { attempts++ diff --git a/client_test.go b/client_test.go index b3c9c00c..90c69240 100644 --- a/client_test.go +++ b/client_test.go @@ -698,21 +698,21 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) { c, m := setup() if cli, ok := c.(*sentinelClient); ok { cli.retryHandler = &mockRetryHandler{ - WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool { + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool { return false }, } } if cli, ok := c.(*clusterClient); ok { cli.retryHandler = &mockRetryHandler{ - WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool { + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool { return false }, } } if cli, ok := c.(*singleClient); ok { cli.retryHandler = &mockRetryHandler{ - WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool { + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool { return false }, } @@ -768,17 +768,17 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) { c, m := setup() if cli, ok := c.(*sentinelClient); ok { cli.retryHandler = &mockRetryHandler{ - WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool { + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool { return false }, } } if cli, ok := c.(*clusterClient); ok { cli.retryHandler = &mockRetryHandler{ - WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool { + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool { return false }, - RetryDelayFn: func(attempts int, err error) time.Duration { + RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration { return -1 }, WaitForRetryFn: func(ctx context.Context, duration time.Duration) { @@ -790,7 +790,7 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) { } if cli, ok := c.(*singleClient); ok { cli.retryHandler = &mockRetryHandler{ - WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool { + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool { return false }, } @@ -846,21 +846,21 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) { c, m := setup() if cli, ok := c.(*sentinelClient); ok { cli.retryHandler = &mockRetryHandler{ - WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool { + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool { return false }, } } if cli, ok := c.(*clusterClient); ok { cli.retryHandler = &mockRetryHandler{ - WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool { + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool { return false }, } } if cli, ok := c.(*singleClient); ok { cli.retryHandler = &mockRetryHandler{ - WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool { + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool { return false }, } @@ -908,17 +908,17 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) { c, m := setup() if cli, ok := c.(*sentinelClient); ok { cli.retryHandler = &mockRetryHandler{ - WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool { + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool { return false }, } } if cli, ok := c.(*clusterClient); ok { cli.retryHandler = &mockRetryHandler{ - WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool { + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool { return false }, - RetryDelayFn: func(attempts int, err error) time.Duration { + RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration { return -1 }, WaitForRetryFn: func(ctx context.Context, duration time.Duration) { @@ -930,7 +930,7 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) { } if cli, ok := c.(*singleClient); ok { cli.retryHandler = &mockRetryHandler{ - WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool { + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool { return false }, } @@ -975,21 +975,21 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) { c, m := setup() if cli, ok := c.(*sentinelClient); ok { cli.retryHandler = &mockRetryHandler{ - WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool { + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool { return false }, } } if cli, ok := c.(*clusterClient); ok { cli.retryHandler = &mockRetryHandler{ - WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool { + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool { return false }, } } if cli, ok := c.(*singleClient); ok { cli.retryHandler = &mockRetryHandler{ - WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool { + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool { return false }, } @@ -1052,14 +1052,14 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) { if ret := c.Dedicated(func(cc DedicatedClient) error { if cli, ok := cc.(*dedicatedClusterClient); ok { cli.retryHandler = &mockRetryHandler{ - WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool { + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool { return false }, } } if cli, ok := cc.(*dedicatedSingleClient); ok { cli.retryHandler = &mockRetryHandler{ - WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool { + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool { return false }, } @@ -1137,14 +1137,14 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) { if ret := c.Dedicated(func(cc DedicatedClient) error { if cli, ok := cc.(*dedicatedClusterClient); ok { cli.retryHandler = &mockRetryHandler{ - WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool { + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool { return false }, } } if cli, ok := cc.(*dedicatedSingleClient); ok { cli.retryHandler = &mockRetryHandler{ - WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool { + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool { return false }, } @@ -1216,14 +1216,14 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) { if ret := c.Dedicated(func(cc DedicatedClient) error { if cli, ok := cc.(*dedicatedClusterClient); ok { cli.retryHandler = &mockRetryHandler{ - WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool { + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool { return false }, } } if cli, ok := cc.(*dedicatedSingleClient); ok { cli.retryHandler = &mockRetryHandler{ - WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool { + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool { return false }, } diff --git a/cluster.go b/cluster.go index d8cd27ee..6ae425f3 100644 --- a/cluster.go +++ b/cluster.go @@ -481,7 +481,7 @@ process: goto process case RedirectRetry: if c.retry && cmd.IsReadOnly() { - shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, resp.Error()) + shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, cmd, resp.Error()) if shouldRetry { attempts++ goto retry @@ -614,7 +614,7 @@ func (c *clusterClient) doresultfn( continue } - retryDelay = c.retryHandler.RetryDelay(attempts, resp.Error()) + retryDelay = c.retryHandler.RetryDelay(attempts, cm, resp.Error()) } else { nc = c.redirectOrNew(addr, cc, cm.Slot(), mode) } @@ -753,7 +753,7 @@ process: } case RedirectRetry: if c.retry && allReadOnly(multi) { - shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, resp.Error()) + shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, multi[i], resp.Error()) if shouldRetry { resultsp.Put(resps) attempts++ @@ -786,7 +786,7 @@ process: goto process case RedirectRetry: if c.retry { - shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, resp.Error()) + shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, Completed(cmd), resp.Error()) if shouldRetry { attempts++ goto retry @@ -930,7 +930,7 @@ func (c *clusterClient) resultcachefn( continue } - retryDelay = c.retryHandler.RetryDelay(attempts, resp.Error()) + retryDelay = c.retryHandler.RetryDelay(attempts, Completed(cm.Cmd), resp.Error()) } else { nc = c.redirectOrNew(addr, cc, cm.Cmd.Slot(), mode) } @@ -1037,7 +1037,7 @@ retry: } err = cc.Receive(ctx, subscribe, fn) if _, mode := c.shouldRefreshRetry(err, ctx); c.retry && mode != RedirectNone { - shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, err) + shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, subscribe, err) if shouldRetry { attempts++ goto retry @@ -1222,7 +1222,7 @@ retry: case RedirectRetry: if c.retry && cmd.IsReadOnly() && w.Error() == nil { shouldRetry := c.retryHandler.WaitOrSkipRetry( - ctx, attempts, resp.Error(), + ctx, attempts, cmd, resp.Error(), ) if shouldRetry { attempts++ @@ -1253,11 +1253,11 @@ func (c *dedicatedClusterClient) DoMulti(ctx context.Context, multi ...Completed retry: if w, err := c.acquire(ctx, slot); err == nil { resp = w.DoMulti(ctx, multi...).s - for _, r := range resp { + for i, r := range resp { _, mode := c.client.shouldRefreshRetry(r.Error(), ctx) if mode == RedirectRetry && retryable && w.Error() == nil { shouldRetry := c.retryHandler.WaitOrSkipRetry( - ctx, attempts, r.Error(), + ctx, attempts, multi[i], r.Error(), ) if shouldRetry { attempts++ @@ -1291,7 +1291,7 @@ retry: if w, err = c.acquire(ctx, subscribe.Slot()); err == nil { err = w.Receive(ctx, subscribe, fn) if _, mode := c.client.shouldRefreshRetry(err, ctx); c.retry && mode == RedirectRetry && w.Error() == nil { - shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, err) + shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, subscribe, err) if shouldRetry { attempts++ goto retry diff --git a/retry.go b/retry.go index 7c5011d0..1f50bab2 100644 --- a/retry.go +++ b/retry.go @@ -14,14 +14,15 @@ const ( // RetryDelayFn returns the delay that should be used before retrying the // attempt. Will return negative delay if the delay could not be determined or do not retry. -type RetryDelayFn func(attempts int, err error) time.Duration +type RetryDelayFn func(attempts int, cmd Completed, err error) time.Duration // defaultRetryDelayFn delays the next retry exponentially without considering the error. // max delay is 1 second. -func defaultRetryDelayFn(attempts int, _ error) time.Duration { - base := time.Microsecond * (1 << min(defaultMaxRetries, attempts)) - jitter := time.Microsecond * time.Duration(util.FastRand(1< defaultMaxRetryDelay { t.Errorf("defaultRetryDelayFn(%d, %v) = %v; want >= 0 and <= %v", i, err, got, defaultMaxRetryDelay) @@ -40,12 +40,12 @@ func TestDefaultRetryDelay(t *testing.T) { func TestRetryer_RetryDelay(t *testing.T) { r := &retryer{ - RetryDelayFn: func(attempts int, err error) time.Duration { + RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration { return time.Second }, } - got := r.RetryDelay(0, nil) + got := r.RetryDelay(0, Completed{}, nil) if got != time.Second { t.Errorf("RetryDelay() = %v; want %v", got, time.Second) } @@ -113,12 +113,12 @@ func TestRetryer_WaitForRetry(t *testing.T) { func TestRetrier_WaitOrSkipRetry(t *testing.T) { t.Run("RetryDelayFn returns negative delay", func(t *testing.T) { r := &retryer{ - RetryDelayFn: func(attempts int, err error) time.Duration { + RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration { return -1 * time.Second }, } - shouldRetry := r.WaitOrSkipRetry(nil, 0, nil) + shouldRetry := r.WaitOrSkipRetry(nil, 0, Completed{}, nil) if shouldRetry { t.Error("WaitOrSkipRetry() = true; want false") } @@ -126,7 +126,7 @@ func TestRetrier_WaitOrSkipRetry(t *testing.T) { t.Run("context is canceled", func(t *testing.T) { r := &retryer{ - RetryDelayFn: func(attempts int, err error) time.Duration { + RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration { return time.Second }, } @@ -134,7 +134,7 @@ func TestRetrier_WaitOrSkipRetry(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - shouldRetry := r.WaitOrSkipRetry(ctx, 0, nil) + shouldRetry := r.WaitOrSkipRetry(ctx, 0, Completed{}, nil) if !shouldRetry { t.Error("WaitOrSkipRetry() = false; want true") } @@ -142,7 +142,7 @@ func TestRetrier_WaitOrSkipRetry(t *testing.T) { t.Run("context deadline is before delay", func(t *testing.T) { r := &retryer{ - RetryDelayFn: func(attempts int, err error) time.Duration { + RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration { return time.Second }, } @@ -151,7 +151,7 @@ func TestRetrier_WaitOrSkipRetry(t *testing.T) { defer cancel() start := time.Now() - shouldRetry := r.WaitOrSkipRetry(ctx, 0, nil) + shouldRetry := r.WaitOrSkipRetry(ctx, 0, Completed{}, nil) if shouldRetry { t.Error("WaitOrSkipRetry() = true; want false") } @@ -164,7 +164,7 @@ func TestRetrier_WaitOrSkipRetry(t *testing.T) { t.Run("wait until next retry", func(t *testing.T) { r := &retryer{ - RetryDelayFn: func(attempts int, err error) time.Duration { + RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration { return 50 * time.Millisecond }, } @@ -173,7 +173,7 @@ func TestRetrier_WaitOrSkipRetry(t *testing.T) { defer cancel() start := time.Now() - shouldRetry := r.WaitOrSkipRetry(ctx, 0, nil) + shouldRetry := r.WaitOrSkipRetry(ctx, 0, Completed{}, nil) if !shouldRetry { t.Error("WaitOrSkipRetry() = false; want true") } @@ -186,13 +186,13 @@ func TestRetrier_WaitOrSkipRetry(t *testing.T) { t.Run("empty context", func(t *testing.T) { r := &retryer{ - RetryDelayFn: func(attempts int, err error) time.Duration { + RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration { return 50 * time.Millisecond }, } start := time.Now() - shouldRetry := r.WaitOrSkipRetry(context.Background(), 0, nil) + shouldRetry := r.WaitOrSkipRetry(context.Background(), 0, Completed{}, nil) if !shouldRetry { t.Error("WaitOrSkipRetry() = false; want true") } diff --git a/sentinel.go b/sentinel.go index 75a65cd6..5d2318e0 100644 --- a/sentinel.go +++ b/sentinel.go @@ -68,7 +68,7 @@ retry: resp = c.mConn.Load().(conn).Do(ctx, cmd) if c.retry && cmd.IsReadOnly() && c.isRetryable(resp.NonRedisError(), ctx) { shouldRetry := c.retryHandler.WaitOrSkipRetry( - ctx, attempts, resp.Error(), + ctx, attempts, cmd, resp.Error(), ) if shouldRetry { attempts++ @@ -90,10 +90,10 @@ func (c *sentinelClient) DoMulti(ctx context.Context, multi ...Completed) []Redi retry: resps := c.mConn.Load().(conn).DoMulti(ctx, multi...) if c.retry && allReadOnly(multi) { - for _, resp := range resps.s { + for i, resp := range resps.s { if c.isRetryable(resp.NonRedisError(), ctx) { shouldRetry := c.retryHandler.WaitOrSkipRetry( - ctx, attempts, resp.Error(), + ctx, attempts, multi[i], resp.Error(), ) if shouldRetry { resultsp.Put(resps) @@ -116,7 +116,7 @@ func (c *sentinelClient) DoCache(ctx context.Context, cmd Cacheable, ttl time.Du retry: resp = c.mConn.Load().(conn).DoCache(ctx, cmd, ttl) if c.retry && c.isRetryable(resp.NonRedisError(), ctx) { - shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, resp.Error()) + shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, Completed(cmd), resp.Error()) if shouldRetry { attempts++ goto retry @@ -137,10 +137,10 @@ func (c *sentinelClient) DoMultiCache(ctx context.Context, multi ...CacheableTTL retry: resps := c.mConn.Load().(conn).DoMultiCache(ctx, multi...) if c.retry { - for _, resp := range resps.s { + for i, resp := range resps.s { if c.isRetryable(resp.NonRedisError(), ctx) { shouldRetry := c.retryHandler.WaitOrSkipRetry( - ctx, attempts, resp.Error(), + ctx, attempts, Completed(multi[i].Cmd), resp.Error(), ) if shouldRetry { resultsp.Put(resps) @@ -165,7 +165,7 @@ retry: if c.retry { if _, ok := err.(*RedisError); !ok && c.isRetryable(err, ctx) { shouldRetry := c.retryHandler.WaitOrSkipRetry( - ctx, attempts, err, + ctx, attempts, subscribe, err, ) if shouldRetry { attempts++