Skip to content

Commit

Permalink
Merge pull request #83 from hansmi/retryflag1
Browse files Browse the repository at this point in the history
Implement flags to control retry delays
  • Loading branch information
SuperQ authored Jan 13, 2021
2 parents a68733f + fa732d0 commit 5cce876
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 49 deletions.
64 changes: 26 additions & 38 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"crypto/x509"
"fmt"
"io/ioutil"
"math/rand"
"net"
"net/http"
"net/url"
Expand All @@ -32,14 +31,15 @@ import (
kingpin "gopkg.in/alecthomas/kingpin.v2"

"github.com/ShowMax/go-fqdn"
"github.com/cenkalti/backoff/v4"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus-community/pushprox/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/promlog"
"github.com/prometheus/common/promlog/flag"
"github.com/prometheus-community/pushprox/util"
)

var (
Expand All @@ -49,6 +49,9 @@ var (
tlsCert = kingpin.Flag("tls.cert", "<cert> Client certificate file").String()
tlsKey = kingpin.Flag("tls.key", "<key> Private key file").String()
metricsAddr = kingpin.Flag("metrics-addr", "Serve Prometheus metrics at this address").Default(":9369").String()

retryInitialWait = kingpin.Flag("proxy.retry.initial-wait", "Amount of time to wait after proxy failure").Default("1s").Duration()
retryMaxWait = kingpin.Flag("proxy.retry.max-wait", "Maximum amount of time to wait between proxy poll retries").Default("5s").Duration()
)

var (
Expand Down Expand Up @@ -76,6 +79,15 @@ func init() {
prometheus.MustRegister(pushErrorCounter, pollErrorCounter, scrapeErrorCounter)
}

func newBackOffFromFlags() backoff.BackOff {
b := backoff.NewExponentialBackOff()
b.InitialInterval = *retryInitialWait
b.Multiplier = 1.5
b.MaxInterval = *retryMaxWait
b.MaxElapsedTime = time.Duration(0)
return b
}

// Coordinator for scrape requests and responses
type Coordinator struct {
logger log.Logger
Expand Down Expand Up @@ -168,7 +180,7 @@ func (c *Coordinator) doPush(resp *http.Response, origRequest *http.Request, cli
return nil
}

func loop(c Coordinator, client *http.Client) error {
func (c *Coordinator) doPoll(client *http.Client) error {
base, err := url.Parse(*proxyURL)
if err != nil {
level.Error(c.logger).Log("msg", "Error parsing url:", "err", err)
Expand Down Expand Up @@ -201,35 +213,18 @@ func loop(c Coordinator, client *http.Client) error {
return nil
}

// decorrelated Jitter increases the maximum jitter based on the last random value.
type decorrelatedJitter struct {
duration time.Duration // sleep time
min time.Duration // min sleep time
cap time.Duration // max sleep time
}

func newJitter() decorrelatedJitter {
rand.Seed(time.Now().UnixNano())
return decorrelatedJitter{
min: 50 * time.Millisecond,
cap: 5 * time.Second,
func (c *Coordinator) loop(bo backoff.BackOff, client *http.Client) {
op := func() error {
return c.doPoll(client)
}
}

func (d *decorrelatedJitter) calc() time.Duration {
change := rand.Float64() * float64(d.duration*time.Duration(3)-d.min)
d.duration = d.min + time.Duration(change)
if d.duration > d.cap {
d.duration = d.cap
}
if d.duration < d.min {
d.duration = d.min
for {
if err := backoff.RetryNotify(op, bo, func(err error, _ time.Duration) {
pollErrorCounter.Inc()
}); err != nil {
level.Error(c.logger).Log("err", err)
}
}
return d.duration
}

func (d *decorrelatedJitter) sleep() {
time.Sleep(d.calc())
}

func main() {
Expand Down Expand Up @@ -299,14 +294,7 @@ func main() {
TLSClientConfig: tlsConfig,
}

jitter := newJitter()
client := &http.Client{Transport: transport}
for {
err := loop(coordinator, client)
if err != nil {
pollErrorCounter.Inc()
jitter.sleep()
continue
}
}

coordinator.loop(newBackOffFromFlags(), client)
}
12 changes: 1 addition & 11 deletions cmd/client/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,6 @@ import (
"github.com/pkg/errors"
)

func TestJitter(t *testing.T) {
jitter := newJitter()
for i := 0; i < 100000; i++ {
duration := jitter.calc()
if !(jitter.min <= duration || duration <= jitter.cap) {
t.Fatal("invalid jitter value: ", duration)
}
}
}

type TestLogger struct{}

func (tl *TestLogger) Log(vars ...interface{}) error {
Expand Down Expand Up @@ -76,7 +66,7 @@ func TestHandleErr(t *testing.T) {
func TestLoop(t *testing.T) {
ts, c := prepareTest()
defer ts.Close()
if err := loop(c, ts.Client()); err != nil {
if err := c.doPoll(ts.Client()); err != nil {
t.Fatal(err)
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.13

require (
github.com/ShowMax/go-fqdn v0.0.0-20180501083314-6f60894d629f
github.com/cenkalti/backoff/v4 v4.1.0
github.com/go-kit/kit v0.10.0
github.com/google/uuid v1.1.1
github.com/pkg/errors v0.9.1
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down

0 comments on commit 5cce876

Please sign in to comment.