forked from riverqueue/river
-
Notifications
You must be signed in to change notification settings - Fork 0
/
retry_policy.go
107 lines (90 loc) · 3.59 KB
/
retry_policy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package river
import (
"math"
"math/rand"
"sync"
"time"
"github.com/riverqueue/river/internal/util/randutil"
"github.com/riverqueue/river/internal/util/timeutil"
"github.com/riverqueue/river/rivertype"
)
// ClientRetryPolicy is an interface that can be implemented to provide a retry
// policy for how River deals with failed jobs at the client level (when a
// worker does not define an override for `NextRetry`). Jobs are scheduled to be
// retried in the future up until they've reached the job's max attempts, at
// which pointed they're set as discarded.
//
// The ClientRetryPolicy does not have access to generics and operates on the
// raw JobRow struct with encoded args.
type ClientRetryPolicy interface {
// NextRetry calculates when the next retry for a failed job should take place
// given when it was last attempted and its number of attempts, or any other
// of the job's properties a user-configured retry policy might want to
// consider.
NextRetry(job *rivertype.JobRow) time.Time
}
// River's default retry policy.
type DefaultClientRetryPolicy struct {
rand *rand.Rand
randMu sync.RWMutex
timeNowFunc func() time.Time
}
// NextRetry gets the next retry given for the given job, accounting for when it
// was last attempted and what attempt number that was. Reschedules using a
// basic exponential backoff of `ATTEMPT^4`, so after the first failure a new
// try will be scheduled in 1 seconds, 16 seconds after the second, 1 minute and
// 21 seconds after the third, etc.
//
// In order to avoid penalizing jobs that are snoozed, the number of errors is
// used instead of the attempt count. This means that snoozing a job (even
// repeatedly) will not lead to a future error having a longer than expected
// retry delay.
func (p *DefaultClientRetryPolicy) NextRetry(job *rivertype.JobRow) time.Time {
// For the purposes of calculating the backoff, we can look solely at the
// number of errors. If we were to use the raw attempt count, this would be
// incemented and influenced by snoozes. However the use case for snoozing is
// to try again later *without* counting as an error.
//
// Note that we explicitly add 1 here, because the error hasn't been appended
// yet at the time this is called (that happens in the completer).
errorCount := len(job.Errors) + 1
return p.timeNowUTC().Add(timeutil.SecondsAsDuration(p.retrySeconds(errorCount)))
}
func (p *DefaultClientRetryPolicy) timeNowUTC() time.Time {
if p.timeNowFunc != nil {
return p.timeNowFunc()
}
return time.Now().UTC()
}
// Lazily marshals a random source. Written this way instead of using a
// constructor so that DefaultRetryPolicy can easily be embedded in other
// structs without special initialization.
func (p *DefaultClientRetryPolicy) lazilyMarshalRand() {
{
p.randMu.RLock()
pRand := p.rand
p.randMu.RUnlock()
if pRand != nil {
return
}
}
p.randMu.Lock()
defer p.randMu.Unlock()
// One additional nil check in case multiple goroutines raced to this point.
if p.rand != nil {
return
}
p.rand = randutil.NewCryptoSeededConcurrentSafeRand()
}
// Gets a number of retry seconds for the given attempt, random jitter included.
func (p *DefaultClientRetryPolicy) retrySeconds(attempt int) float64 {
p.lazilyMarshalRand()
retrySeconds := p.retrySecondsWithoutJitter(attempt)
// Jitter number of seconds +/- 10%.
retrySeconds += retrySeconds * (p.rand.Float64()*0.2 - 0.1)
return retrySeconds
}
// Gets a base number of retry seconds for the given attempt, jitter excluded.
func (p *DefaultClientRetryPolicy) retrySecondsWithoutJitter(attempt int) float64 {
return math.Pow(float64(attempt), 4)
}