Skip to content

Commit

Permalink
Add river bench benchmarking tool producing succinct output + summa…
Browse files Browse the repository at this point in the history
…ry (#254)

Here, add a new benchmarking tool to the main River CLI. We had an
existing one, but it hasn't been used or updated in ages, and was
written quite quickly, without too much concern for UX.

This `river bench` is user-runnable, and designed to produce output
that's succinct and easily comprehensible. Every two seconds it produces
a new line of output with the number of jobs worked during that period,
number of jobs inserted during that period, and the rough jobs per
second being complete. When the program is interrupted via `SIGINT`, it
produces one final log line indicating similar information, but
calculated across the entire run period.

    $ go run main.go bench --database-url $DATABASE_URL
    bench: jobs worked [          0 ], inserted [      50000 ], job/sec [        0.0 ] [0s]
    bench: jobs worked [      22445 ], inserted [      22000 ], job/sec [    11222.5 ] [2s]
    bench: jobs worked [      26504 ], inserted [      28000 ], job/sec [    13252.0 ] [2s]
    bench: jobs worked [      25919 ], inserted [      24000 ], job/sec [    12959.5 ] [2s]
    bench: jobs worked [      27432 ], inserted [      28000 ], job/sec [    13716.0 ] [2s]
    bench: jobs worked [      26068 ], inserted [      26000 ], job/sec [    13034.0 ] [2s]
    bench: jobs worked [      27068 ], inserted [      28000 ], job/sec [    13534.0 ] [2s]
    bench: jobs worked [      27876 ], inserted [      28000 ], job/sec [    13938.0 ] [2s]
    bench: jobs worked [      25058 ], inserted [      24000 ], job/sec [    12529.0 ] [2s]
    ^Cbench: total jobs worked [     214356 ], total jobs inserted [     264000 ], overall job/sec [    13026.7 ], running 16.455185125s

It can also run with a total duration, which will be useful if we're
trying to compare runs across branches without having to try and time it
artificially:

    $ go run main.go bench --database-url $DATABASE_URL --duration 30s
    bench: jobs worked [          0 ], inserted [      50000 ], job/sec [        0.0 ] [0s]
    bench: jobs worked [      23875 ], inserted [      24000 ], job/sec [    11937.5 ] [2s]
    bench: jobs worked [      27964 ], inserted [      28000 ], job/sec [    13982.0 ] [2s]
    bench: jobs worked [      25694 ], inserted [      26000 ], job/sec [    12847.0 ] [2s]
    bench: jobs worked [      26649 ], inserted [      26000 ], job/sec [    13324.5 ] [2s]
    bench: jobs worked [      26872 ], inserted [      28000 ], job/sec [    13436.0 ] [2s]
    bench: jobs worked [      26519 ], inserted [      26000 ], job/sec [    13259.5 ] [2s]
    bench: jobs worked [      25077 ], inserted [      24000 ], job/sec [    12538.5 ] [2s]
    bench: jobs worked [      24126 ], inserted [      26000 ], job/sec [    12063.0 ] [2s]
    bench: jobs worked [      23936 ], inserted [      22000 ], job/sec [    11968.0 ] [2s]
    bench: jobs worked [      26044 ], inserted [      28000 ], job/sec [    13022.0 ] [2s]
    bench: jobs worked [      26289 ], inserted [      26000 ], job/sec [    13144.5 ] [2s]
    bench: jobs worked [      23058 ], inserted [      22000 ], job/sec [    11529.0 ] [2s]
    bench: jobs worked [      23474 ], inserted [      24000 ], job/sec [    11737.0 ] [2s]
    bench: jobs worked [      25380 ], inserted [      26000 ], job/sec [    12690.0 ] [2s]
    bench: total jobs worked [     375743 ], total jobs inserted [     426000 ], overall job/sec [    12524.8 ], running 30.000017167s

Unlike the old benchmarking tool, we switch this one over to do job
accounting using a client subscribe channel instead of measuring it in
the worker. Measuring in the worker doesn't account for the time needed
to block in the job executor waiting for a goroutine to become available
in the completer to complete a job, making it less accurate and possibly
prone to memory overruns as a large backlog of jobs have been accounted
as completed but are actually waiting for a completer slot.

I'm not going to call this feature complete, but I think it's a step in
the right direction, and the hope is that it'll give us a reasonable way
to gutcheck new changes and see whether they cause an obvious regression
or improvement in total performance.
  • Loading branch information
brandur authored Mar 10, 2024
1 parent 436455e commit fbd907f
Show file tree
Hide file tree
Showing 7 changed files with 530 additions and 208 deletions.
13 changes: 13 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,19 @@ jobs:
run: ./river validate --database-url $DATABASE_URL
working-directory: ./cmd/river

- name: river bench
run: |
( sleep 10 && killall -SIGTERM river ) &
./river bench --database-url $DATABASE_URL
working-directory: ./cmd/river

# Bench again in fixed number of jobs mode.
- name: river bench
run: |
( sleep 10 && killall -SIGTERM river ) &
./river bench --database-url $DATABASE_URL --num-total-jobs 1_234
working-directory: ./cmd/river

- name: river migrate-down
run: ./river migrate-down --database-url $DATABASE_URL --max-steps 100
working-directory: ./cmd/river
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- The River CLI now supports `river bench` to benchmark River's job throughput against a database. [PR #254](https://github.com/riverqueue/river/pull/254).

### Changed

- Changed default client IDs to be a combination of hostname and the time which the client started. This can still be changed by specifying `Config.ID`. [PR #255](https://github.com/riverqueue/river/pull/255).
Expand Down
4 changes: 3 additions & 1 deletion cmd/river/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go 1.21.4
require (
github.com/jackc/pgx/v5 v5.5.2
github.com/riverqueue/river v0.0.17
github.com/riverqueue/river/riverdriver v0.0.17
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.17
github.com/spf13/cobra v1.8.0
)
Expand All @@ -22,9 +23,10 @@ require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/riverqueue/river/riverdriver v0.0.17 // indirect
github.com/oklog/ulid/v2 v2.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/text v0.14.0 // indirect
)
5 changes: 5 additions & 0 deletions cmd/river/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/riverqueue/river v0.0.17 h1:7beHZxo1WMzhN48y1Jt7CKkkmsw+TjuLd6qCEaznm7s=
Expand All @@ -26,6 +29,8 @@ github.com/riverqueue/river/riverdriver/riverdatabasesql v0.0.17 h1:xPmTpQNBicTZ
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.0.17/go.mod h1:zlZKXZ6XHcbwYniSKWX2+GwFlXHTnG9pJtE/BkxK0Xc=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.17 h1:iuruCNT7nkC7Z4Qzb79jcvAVniGyK+Kstsy7fKJagUU=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.17/go.mod h1:kL59NW3LoPbQxPz9DQoUtDYq3Zkcpdt5CIowgeBZwFw=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
Expand Down
86 changes: 81 additions & 5 deletions cmd/river/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"strconv"
"time"

"github.com/jackc/pgx/v5/pgxpool"
"github.com/spf13/cobra"

"github.com/riverqueue/river/cmd/river/riverbench"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivermigrate"
)
Expand Down Expand Up @@ -39,14 +41,46 @@ Provides command line facilities for the River job queue.
}
}

mustMarkFlagRequired := func(cmd *cobra.Command, name string) {
mustMarkFlagRequired := func(cmd *cobra.Command, name string) { //nolint:unparam
// We just panic here because this will never happen outside of an error
// in development.
if err := cmd.MarkFlagRequired(name); err != nil {
panic(err)
}
}

// bench
{
var opts benchOpts

cmd := &cobra.Command{
Use: "bench",
Short: "Run River benchmark",
Long: `
Run a River benchmark which inserts and works jobs continually, giving a rough
idea of jobs per second and time to work a single job.
By default, the benchmark will continuously insert and work jobs in perpetuity
until interrupted by SIGINT (Ctrl^C). It can alternatively take a maximum run
duration with --duration, which takes a Go-style duration string like 1m.
Lastly, it can take --num-total-jobs, which inserts the given number of jobs
before starting the client, and works until all jobs are finished.
The database in --database-url will have its jobs table truncated, so make sure
to use a development database only.
`,
Run: func(cmd *cobra.Command, args []string) {
execHandlingError(func() (bool, error) { return bench(ctx, &opts) })
},
}
cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to benchmark (should look like `postgres://...`")
cmd.Flags().DurationVar(&opts.Duration, "duration", 0, "duration after which to stop benchmark, accepting Go-style durations like 1m, 5m30s")
cmd.Flags().IntVarP(&opts.NumTotalJobs, "num-total-jobs", "n", 0, "number of jobs to insert before starting and which are worked down until finish")
cmd.Flags().BoolVarP(&opts.Verbose, "verbose", "v", false, "output additional logging verbosity")
mustMarkFlagRequired(cmd, "database-url")
rootCmd.AddCommand(cmd)
}

// migrate-down
{
var opts migrateDownOpts
Expand All @@ -65,8 +99,8 @@ Defaults to running a single down migration. This behavior can be changed with
},
}
cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to migrate (should look like `postgres://...`")
cmd.Flags().IntVar(&opts.MaxSteps, "max-steps", 1, "Maximum number of steps to migrate")
cmd.Flags().IntVar(&opts.TargetVersion, "target-version", 0, "Target version to migrate to (final state includes this version, but none after it)")
cmd.Flags().IntVar(&opts.MaxSteps, "max-steps", 1, "maximum number of steps to migrate")
cmd.Flags().IntVar(&opts.TargetVersion, "target-version", 0, "target version to migrate to (final state includes this version, but none after it)")
mustMarkFlagRequired(cmd, "database-url")
rootCmd.AddCommand(cmd)
}
Expand All @@ -89,8 +123,8 @@ restricted with --max-steps or --target-version.
},
}
cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to migrate (should look like `postgres://...`")
cmd.Flags().IntVar(&opts.MaxSteps, "max-steps", 0, "Maximum number of steps to migrate")
cmd.Flags().IntVar(&opts.TargetVersion, "target-version", 0, "Target version to migrate to (final state includes this version)")
cmd.Flags().IntVar(&opts.MaxSteps, "max-steps", 0, "maximum number of steps to migrate")
cmd.Flags().IntVar(&opts.TargetVersion, "target-version", 0, "target version to migrate to (final state includes this version)")
mustMarkFlagRequired(cmd, "database-url")
rootCmd.AddCommand(cmd)
}
Expand Down Expand Up @@ -151,6 +185,48 @@ func setParamIfUnset(runtimeParams map[string]string, name, val string) {
runtimeParams[name] = val
}

type benchOpts struct {
DatabaseURL string
Duration time.Duration
NumTotalJobs int
Verbose bool
}

func (o *benchOpts) validate() error {
if o.DatabaseURL == "" {
return errors.New("database URL cannot be empty")
}

return nil
}

func bench(ctx context.Context, opts *benchOpts) (bool, error) {
if err := opts.validate(); err != nil {
return false, err
}

dbPool, err := openDBPool(ctx, opts.DatabaseURL)
if err != nil {
return false, err
}
defer dbPool.Close()

var logger *slog.Logger
if opts.Verbose {
logger = slog.New(slog.NewTextHandler(os.Stdout, nil))
} else {
logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn}))
}

benchmarker := riverbench.NewBenchmarker(riverpgxv5.New(dbPool), logger, opts.Duration, opts.NumTotalJobs)

if err := benchmarker.Run(ctx); err != nil {
return false, err
}

return true, nil
}

type migrateDownOpts struct {
DatabaseURL string
MaxSteps int
Expand Down
Loading

0 comments on commit fbd907f

Please sign in to comment.