Skip to content

Commit

Permalink
Adding command line args for Idle Timeout (#21)
Browse files Browse the repository at this point in the history
* introducing command line parameters

* adding statsd calls

* go.mod

---------

Co-authored-by: Jeff Saremi <[email protected]>
  • Loading branch information
jeffsaremi and jeffsaremi authored Apr 11, 2023
1 parent 7a0f09b commit d5727cf
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 125 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,11 @@ Usage: bin/redisbetween [OPTIONS] uri1 [uri2] ...
-healthcheck
start a background process to check the health of server connections
-healthcheckcycle
number of seconds after which the healthcheck process should repeat itself (default 60s)
duration after which the healthcheck process should repeat itself (default 60s)
-healthcheckthreshold
count of consecutive healthcheck failures after which a server is declared unhealthy (default 3)
-idletimeout
how long can an inactive connection remain idle in the pool (default 0 meaning no timeout)
```

Each URI can specify the following settings as GET params:
Expand All @@ -121,5 +123,6 @@ Each URI can specify the following settings as GET params:
- `readonly` every connection issues a [READONLY](https://redis.io/commands/readonly) command before entering the pool. Defaults to false
- `maxsubscriptions` sets the max number of channels that can be subscribed to at one time. Defaults to 1.
- `maxblockers` sets the max number of commands that can be blocking at one time. Defaults to 1.
- `idletimeout` how long can an inactive connection remain idle in the pool. Default is the global level `idletimeout` or 0 (no timeout)

Example: `./redisbetween -unlink -pretty -loglevel debug redis://localhost:7001?maxsubscriptions=2&maxblockers=2`
87 changes: 49 additions & 38 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,26 @@ import (
)

const defaultStatsdAddress = "localhost:8125"
const defaultHealthCheckCycle = 60 * time.Second
const defaultHealthCheckThreshold = 3

var validNetworks = []string{"tcp", "tcp4", "tcp6", "unix", "unixpacket"}

type Config struct {
Network string
LocalSocketPrefix string
LocalSocketSuffix string
Unlink bool
MinPoolSize uint64
MaxPoolSize uint64
Pretty bool
Statsd string
Level zapcore.Level
Upstreams []Upstream
HealthCheck bool
ServerHealthCheckSec uint64
ServerHealthCheckThreshold uint64
Network string
LocalSocketPrefix string
LocalSocketSuffix string
Unlink bool
MinPoolSize uint64
MaxPoolSize uint64
Pretty bool
Statsd string
Level zapcore.Level
Upstreams []Upstream
HealthCheck bool
HealthCheckCycle time.Duration
HealthCheckThreshold int
IdleTimeout time.Duration
}

type Upstream struct {
Expand All @@ -46,6 +49,7 @@ type Upstream struct {
Readonly bool
MaxSubscriptions int
MaxBlockers int
IdleTimeout time.Duration
}

func ParseFlags() *Config {
Expand Down Expand Up @@ -76,7 +80,8 @@ func parseFlags() (*Config, error) {
var network, localSocketPrefix, localSocketSuffix, stats, loglevel string
var pretty, unlink bool
var healthCheck bool
var healthCheckThreshold, healthCheckCycle uint64
var healthCheckThreshold int
var healthCheckCycle, idleTimeout time.Duration
flag.StringVar(&network, "network", "unix", "One of: tcp, tcp4, tcp6, unix or unixpacket")
flag.StringVar(&localSocketPrefix, "localsocketprefix", "/var/tmp/redisbetween-", "Prefix to use for unix socket filenames")
flag.StringVar(&localSocketSuffix, "localsocketsuffix", ".sock", "Suffix to use for unix socket filenames")
Expand All @@ -85,8 +90,9 @@ func parseFlags() (*Config, error) {
flag.BoolVar(&pretty, "pretty", false, "Pretty print logging")
flag.StringVar(&loglevel, "loglevel", "info", "One of: debug, info, warn, error, dpanic, panic, fatal")
flag.BoolVar(&healthCheck, "healthcheck", false, "Start the routine to do health checks on redis servers")
flag.Uint64Var(&healthCheckCycle, "healthcheckcycle", 60, "Integer value for the cycle during which server connections will be health-checked (sec); Must be bigger than healthcheckthreshold * 1sec; default: 60s")
flag.Uint64Var(&healthCheckThreshold, "healthcheckthreshold", 3, "The number of concecutive failures needed to declare a server connection dead; default: 3")
flag.DurationVar(&healthCheckCycle, "healthcheckcycle", defaultHealthCheckCycle, "Duration for the cycle during which server connections will be health-checked; Must be larger than healthcheckthreshold * 1s; default: 60s")
flag.IntVar(&healthCheckThreshold, "healthcheckthreshold", defaultHealthCheckThreshold, "The number of concecutive failures needed to declare a server connection dead; default: 3")
flag.DurationVar(&idleTimeout, "idletimeout", 0, "Timeout value that a connection can remain idle; After this a connection is recreated; default: 0 (no timeout)")

// todo remove these flags in a follow up, after all envs have updated to the new url-param style of timeout config
var obsoleteArg string
Expand Down Expand Up @@ -131,26 +137,18 @@ func parseFlags() (*Config, error) {
return nil, err
}

rt, err := time.ParseDuration(getStringParam(params, "readtimeout", "5s"))
if err != nil {
return nil, err
}
wt, err := time.ParseDuration(getStringParam(params, "writetimeout", "5s"))
if err != nil {
return nil, err
}

us := Upstream{
UpstreamConfigHost: u.Host,
Label: getStringParam(params, "label", ""),
MaxPoolSize: getIntParam(params, "maxpoolsize", 10),
MinPoolSize: getIntParam(params, "minpoolsize", 1),
Database: db,
ReadTimeout: rt,
WriteTimeout: wt,
ReadTimeout: getDurationParam(params, "readtimeout", 5*time.Second),
WriteTimeout: getDurationParam(params, "writetimeout", 5*time.Second),
Readonly: getBoolParam(params, "readonly"),
MaxSubscriptions: getIntParam(params, "maxsubscriptions", 1),
MaxBlockers: getIntParam(params, "maxblockers", 1),
IdleTimeout: getDurationParam(params, "idletimeout", idleTimeout),
}

upstreams = append(upstreams, us)
Expand All @@ -175,17 +173,18 @@ func parseFlags() (*Config, error) {
}

return &Config{
Upstreams: upstreams,
Network: network,
LocalSocketPrefix: localSocketPrefix,
LocalSocketSuffix: localSocketSuffix,
Unlink: unlink,
Pretty: pretty,
Statsd: stats,
Level: level,
HealthCheck: healthCheck,
ServerHealthCheckThreshold: healthCheckThreshold,
ServerHealthCheckSec: healthCheckCycle,
Upstreams: upstreams,
Network: network,
LocalSocketPrefix: localSocketPrefix,
LocalSocketSuffix: localSocketSuffix,
Unlink: unlink,
Pretty: pretty,
Statsd: stats,
Level: level,
HealthCheck: healthCheck,
HealthCheckThreshold: healthCheckThreshold,
HealthCheckCycle: healthCheckCycle,
IdleTimeout: idleTimeout,
}, nil
}

Expand Down Expand Up @@ -222,3 +221,15 @@ func expandEnv(config string) string {
return os.ExpandEnv(s)
})
}

func getDurationParam(v url.Values, key string, def time.Duration) time.Duration {
cl, ok := v[key]
if !ok {
return def
}
d, e := time.ParseDuration(cl[0])
if e != nil {
return def
}
return d
}
98 changes: 98 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,101 @@ func TestMissingAddresses(t *testing.T) {
_, err := parseFlags()
assert.EqualError(t, err, "missing list of upstream hosts")
}

func TestIdleTimeout(t *testing.T) {
oldArgs := os.Args
defer func() { os.Args = oldArgs }()
os.Args = []string{}
os.Args = []string{
"redisbetween",
"-statsd", "statsd:1234",
"-unlink",
"-idletimeout", "10s",
"redis://localhost:7000/0?minpoolsize=5&maxpoolsize=33&label=cluster1",
}

resetFlags()
c, err := parseFlags()
assert.NoError(t, err)
assert.Equal(t, 10*time.Second, c.IdleTimeout)
}

func TestIdleTimeoutDefault(t *testing.T) {
oldArgs := os.Args
defer func() { os.Args = oldArgs }()
os.Args = []string{}
os.Args = []string{
"redisbetween",
"-statsd", "statsd:1234",
"-unlink",
"redis://localhost:7000/0?minpoolsize=5&maxpoolsize=33&label=cluster1",
}

resetFlags()
c, err := parseFlags()
assert.NoError(t, err)
assert.Equal(t, time.Duration(0), c.IdleTimeout)
}

func TestIdleTimeoutPerUrl(t *testing.T) {
oldArgs := os.Args
defer func() { os.Args = oldArgs }()
os.Args = []string{}
os.Args = []string{
"redisbetween",
"-statsd", "statsd:1234",
"-unlink",
"-idletimeout", "10s",
"redis://localhost:7001/0",
"redis://localhost:7002/0?idletimeout=0s",
"redis://localhost:7003/0?idletimeout=30s",
}

resetFlags()
c, err := parseFlags()
assert.NoError(t, err)
assert.Equal(t, 10*time.Second, c.Upstreams[0].IdleTimeout)
assert.Equal(t, time.Duration(0), c.Upstreams[1].IdleTimeout)
assert.Equal(t, 30*time.Second, c.Upstreams[2].IdleTimeout)
}

func TestHealthcheckArgs(t *testing.T) {
oldArgs := os.Args
defer func() { os.Args = oldArgs }()
os.Args = []string{}
os.Args = []string{
"redisbetween",
"-statsd", "statsd:1234",
"-unlink",
"-healthcheck",
"-healthcheckcycle", "10s",
"-healthcheckthreshold", "5",
"redis://localhost:7000/0?minpoolsize=5&maxpoolsize=33&label=cluster1",
}

resetFlags()
c, err := parseFlags()
assert.NoError(t, err)
assert.True(t, c.HealthCheck)
assert.Equal(t, 10*time.Second, c.HealthCheckCycle)
assert.Equal(t, 5, c.HealthCheckThreshold)
}

func TestHealthcheckDefault(t *testing.T) {
oldArgs := os.Args
defer func() { os.Args = oldArgs }()
os.Args = []string{}
os.Args = []string{
"redisbetween",
"-statsd", "statsd:1234",
"-unlink",
"redis://localhost:7000/0?minpoolsize=5&maxpoolsize=33&label=cluster1",
}

resetFlags()
c, err := parseFlags()
assert.NoError(t, err)
assert.False(t, c.HealthCheck)
assert.Equal(t, 1*time.Minute, c.HealthCheckCycle)
assert.Equal(t, 3, c.HealthCheckThreshold)
}
10 changes: 7 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@ go 1.13

require (
github.com/DataDog/datadog-go v4.2.0+incompatible
github.com/coinbase/memcachedbetween v0.0.0-20211102164832-e9f6e7c6d80e
github.com/coinbase/memcachedbetween v0.0.3
github.com/coinbase/mongobetween v0.0.9
github.com/go-redis/redis/v8 v8.4.2
github.com/golang/protobuf v1.5.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mediocregopher/radix/v3 v3.6.0
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/stretchr/testify v1.6.1
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.16.0
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4 // indirect
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/genproto v0.0.0-20210503173045-b96a97608f20
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4 // indirect
golang.org/x/text v0.3.5 // indirect
golang.org/x/tools v0.1.0 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
honnef.co/go/tools v0.0.1-2020.1.5 // indirect
)
Loading

0 comments on commit d5727cf

Please sign in to comment.