Skip to content

Commit

Permalink
API-side query caching for job counts
Browse files Browse the repository at this point in the history
This one's related to trying to find a solution for #106. After messing
around with query plans a lot on the "count by state" query, I came to
the conclusion in the end that Postgres might actually be doing the
right thing by falling back to a sequential scan, or at least only the
minimally wrong thing. Even forcing the count query to run against a
well-used index is a fairly slow operation when there are many jobs in
the database. It's hard to provide specifics because caching affects the
result so much (so running the same query twice in a row can produce
vastly different timings), but I've seen the index version take _longer_
than the seq scan in some cases.

So here, I'm proposing a radically different solution in which we add
some infrastructure to the River UI API server that lets it run slow
queries periodically in the background, then have API endpoints take
advantage of those cached results instead of having to run each
operation themselves, thereby making their responses ~instant.

I've written it such that this caching only kicks in when we know we're
working with a very large data set where it actually matters (currently
defined as > 1M rows), with the idea being that for smaller databases
we'll continue to run queries in-band so that results look as fresh and
real-time as possible.

To support this, I've had to make some changes to the River UI API
server/handler so that it has a `Start` function that can be invoked to
start background utilities like the query cache. It's a considerable
change, but I think it leaves us in a more sustainable place API-wise
because we may want to add other background utilities later on, and
returning an `http.Handler` isn't enough because even if you were to
start goroutines from `NewHandler`, it's very, very not ideal that
there's no way to stop those goroutines again (problematic for anything
that wants to check for leaks with goleak).

I'm also going to propose that we increase the default API endpoint
timeout from 5 seconds to 10 seconds. When I load in 3 to 5 million job
rows, I see count queries taking right around that 3 to 5 seconds range.
Since the original number of 5 seconds was a little arbitrary anyway, it
can't hurt to give those queries a little more leeway. A problem that
could still occur even with my proposal here is that if a user starts
River UI and then immediately hits the UI, there won't be a cached
results yet, and therefore the count query will go to the database
directly, and that may still cause a timeout at 5 seconds.

I've only applied caching to the count timeout so far, but I've written
the `QueryCacher` code such that it can cleanly support other queries if
we care to add them.
  • Loading branch information
brandur committed Aug 13, 2024
1 parent e58688a commit 33f2b33
Show file tree
Hide file tree
Showing 12 changed files with 595 additions and 131 deletions.
19 changes: 13 additions & 6 deletions cmd/riverui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ func main() {
if err := godotenv.Load(); err != nil {
fmt.Printf("No .env file detected, using environment variables\n")
}
logger = slog.New(slog.NewTextHandler(os.Stdout, nil))

if os.Getenv("RIVER_DEBUG") == "1" || os.Getenv("RIVER_DEBUG") == "true" {
logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
} else {
logger = slog.New(slog.NewTextHandler(os.Stdout, nil))
}

os.Exit(initAndServe(ctx))
}

func initAndServe(ctx context.Context) int {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var pathPrefix string
flag.StringVar(&pathPrefix, "prefix", "/", "path prefix to use for the API and UI HTTP requests")
flag.Parse()
Expand Down Expand Up @@ -82,13 +84,18 @@ func initAndServe(ctx context.Context) int {
Prefix: pathPrefix,
}

handler, err := riverui.NewHandler(handlerOpts)
server, err := riverui.NewServer(handlerOpts)
if err != nil {
logger.ErrorContext(ctx, "error creating handler", slog.String("error", err.Error()))
return 1
}

logHandler := sloghttp.Recovery(handler)
if err := server.Start(ctx); err != nil {
logger.ErrorContext(ctx, "error starting UI server", slog.String("error", err.Error()))
return 1
}

logHandler := sloghttp.Recovery(server.Handler())
config := sloghttp.Config{
WithSpanID: otelEnabled,
WithTraceID: otelEnabled,
Expand Down
17 changes: 10 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
module github.com/riverqueue/riverui

go 1.22.5
go 1.21

toolchain go1.22.5

require (
github.com/go-playground/validator/v10 v10.22.0
github.com/google/uuid v1.6.0
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438
github.com/jackc/pgx/v5 v5.6.0
github.com/joho/godotenv v1.5.1
github.com/riverqueue/river v0.8.0
github.com/riverqueue/river/riverdriver v0.8.0
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.8.0
github.com/riverqueue/river/rivertype v0.8.0
github.com/riverqueue/river v0.11.2
github.com/riverqueue/river/riverdriver v0.11.2
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.11.2
github.com/riverqueue/river/rivershared v0.11.2
github.com/riverqueue/river/rivertype v0.11.2
github.com/rs/cors v1.11.0
github.com/samber/slog-http v1.0.0
github.com/stretchr/testify v1.9.0
Expand All @@ -27,12 +30,12 @@ require (
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/riverqueue/river/rivershared v0.11.1 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
go.uber.org/goleak v1.3.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.16.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
32 changes: 16 additions & 16 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,22 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/riverqueue/river v0.8.0 h1:IBUIP9eZX/dkLQ3T+XNNk0Zi7iyUksZd4aHxQIFChOQ=
github.com/riverqueue/river v0.8.0/go.mod h1:EHRbhqVXDpXQizFh4lndwswu53N0txITrLM2y3vOIF4=
github.com/riverqueue/river/riverdriver v0.8.0 h1:vSeIvf2Z+/hHH4QF1NK/rvzuZJeZZ+voHz55ZPf9efA=
github.com/riverqueue/river/riverdriver v0.8.0/go.mod h1:YZUVae96RsQJaAem0o0EpgD7fDNPdl/qJiuUFh/vkVE=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.8.0 h1:eH6kkU8qstq1Rj7d0PBYmptaZy6vPsea0WzhBf7/SL4=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.8.0/go.mod h1:4jXPB30TNOWSeOvNvk1Mdov4XIMTBCnIzysrdAXizzs=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.8.0 h1:9lF2GQIU0Z5gynaY6kevJwW5ycy/VbH9S/iYu0+Lf7U=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.8.0/go.mod h1:rPTUHOdsrQIEyeEesEaBzNyj0Hs4VtXGUHHPC4JwgZ0=
github.com/riverqueue/river/rivershared v0.11.1 h1:5HDZ5fPrHf68lrE2CTTTUfRfdCmfW1G6P/v0zCvor7I=
github.com/riverqueue/river/rivershared v0.11.1/go.mod h1:2egnQ7czNcW8IXKXMRjko0aEMrQzF4V3k3jddmYiihE=
github.com/riverqueue/river/rivertype v0.8.0 h1:Ys49e1AECeIOTxRquXC446uIEPXiXLMNVKD4KwexJPM=
github.com/riverqueue/river/rivertype v0.8.0/go.mod h1:nDd50b/mIdxR/ezQzGS/JiAhBPERA7tUIne21GdfspQ=
github.com/riverqueue/river v0.11.2 h1:U1f0xZ+B3qdOJSHJ8A2c93CEsFQGGkbG4ZN8blUas5g=
github.com/riverqueue/river v0.11.2/go.mod h1:0MCkMUIjwAjkKAmcWEbHP1IKWiXq+Z3iNVK5dsYVQYY=
github.com/riverqueue/river/riverdriver v0.11.2 h1:2xC+R0Y+CFEOSDWKyeFef0wqQLuvhk3PsLkos7MLa1w=
github.com/riverqueue/river/riverdriver v0.11.2/go.mod h1:RhMuAjEtNGexwOFnz445G1iFNZVOnYQ90HDYxHMI+jM=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.11.2 h1:I4ye1YEa35kqB6Jd3xVPNxbGDL6S1gpSTkZu25qffhc=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.11.2/go.mod h1:+cOcD4U+8ugUeRZVTGqVhtScy0FS7LPyp+ZsoPIeoMI=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.11.2 h1:yxFi09ECN02iAr2uO0n7QhFKAyyGZ+Rn9fzKTt2TGhk=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.11.2/go.mod h1:ajPqIw7OgYBfR24MqH3VGI/SiYVgq0DkvdM7wrs+uDA=
github.com/riverqueue/river/rivershared v0.11.2 h1:VbuLE6zm68R24xBi1elfnerhLBBn6X7DUxR9j4mcTR4=
github.com/riverqueue/river/rivershared v0.11.2/go.mod h1:J4U3qm8MbjHY1o5OlRNiWaminYagec1o8sHYX4ZQ4S4=
github.com/riverqueue/river/rivertype v0.11.2 h1:YREWOGxDMDe1DTdvttwr2DVq/ql65u6e4jkw3VxuNyU=
github.com/riverqueue/river/rivertype v0.11.2/go.mod h1:bm5EMOGAEWhtXKqo27POWnViqSD5nHMZDP/jsrJc530=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/rs/cors v1.11.0 h1:0B9GE/r9Bc2UxRMMtymBkHTenPkHDv0CW4Y98GBY+po=
github.com/rs/cors v1.11.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/samber/slog-http v1.0.0 h1:KjxyJm2lOsuWBt904A04qvrp+0ZvOfwDnk6jI8h7/5c=
Expand All @@ -72,8 +72,8 @@ golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
Expand Down
101 changes: 85 additions & 16 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"github.com/jackc/pgx/v5"

"github.com/riverqueue/river"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/util/randutil"
"github.com/riverqueue/riverui/internal/apiendpoint"
"github.com/riverqueue/riverui/internal/apimiddleware"
"github.com/riverqueue/riverui/internal/dbsqlc"
Expand Down Expand Up @@ -62,8 +65,14 @@ func normalizePathPrefix(prefix string) string {
return prefix
}

// NewHandler creates a new http.Handler that serves the River UI and API.
func NewHandler(opts *HandlerOpts) (http.Handler, error) {
type Server struct {
baseStartStop startstop.BaseStartStop
handler http.Handler
services []startstop.Service
}

// NewServer creates a new http.Handler that serves the River UI and API.
func NewServer(opts *HandlerOpts) (*Server, error) {
if opts == nil {
return nil, errors.New("opts is required")
}
Expand All @@ -80,6 +89,12 @@ func NewHandler(opts *HandlerOpts) (http.Handler, error) {
serveIndex := serveFileContents("index.html", httpFS)

apiBundle := apiBundle{
// TODO: Switch to baseservice.NewArchetype when available.
archetype: &baseservice.Archetype{
Logger: opts.Logger,
Rand: randutil.NewCryptoSeededConcurrentSafeRand(),
Time: &baseservice.UnStubbableTimeGenerator{},
},
client: opts.Client,
dbPool: opts.DBPool,
logger: opts.Logger,
Expand All @@ -88,19 +103,35 @@ func NewHandler(opts *HandlerOpts) (http.Handler, error) {
prefix := opts.Prefix

mux := http.NewServeMux()
apiendpoint.Mount(mux, opts.Logger, &healthCheckGetEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &jobCancelEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &jobDeleteEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &jobListEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &jobRetryEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &jobGetEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &queueGetEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &queueListEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &queuePauseEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &queueResumeEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &stateAndCountGetEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &workflowGetEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &workflowListEndpoint{apiBundle: apiBundle})

endpoints := []apiendpoint.EndpointInterface{
apiendpoint.Mount(mux, opts.Logger, newHealthCheckGetEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newJobCancelEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newJobDeleteEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newJobListEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newJobRetryEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newJobGetEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newQueueGetEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newQueueListEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newQueuePauseEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newQueueResumeEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newStateAndCountGetEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newWorkflowGetEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newWorkflowListEndpoint(apiBundle)),
}

var services []startstop.Service

type WithSubServices interface {
SubServices() []startstop.Service
}

// If any endpoints are start/stop services, start them up.
for _, endpoint := range endpoints {
if withSubServices, ok := endpoint.(WithSubServices); ok {
services = append(services, withSubServices.SubServices()...)
}
}

if err := mountStaticFiles(opts.Logger, mux); err != nil {
return nil, err
Expand All @@ -115,7 +146,45 @@ func NewHandler(opts *HandlerOpts) (http.Handler, error) {
middlewareStack.Use(&stripPrefixMiddleware{prefix})
}

return middlewareStack.Mount(mux), nil
server := &Server{
handler: middlewareStack.Mount(mux),
services: services,
}

return server, nil
}

// Handler returns an http.Handler that can be mounted to serve HTTP requests.
func (s *Server) Handler() http.Handler { return s.handler }

// Start starts the server's background services. Notably, this does _not_ cause
// the server to start listening for HTTP in any way. To do that, call Handler
// and mount or run it using Go's built in `net/http`.
func (s *Server) Start(ctx context.Context) error {
ctx, shouldStart, started, stopped := s.baseStartStop.StartInit(ctx)
if !shouldStart {
return nil
}

for _, service := range s.services {
if err := service.Start(ctx); err != nil {
return err
}
}

go func() {
// Wait for all subservices to start up before signaling our own start.
startstop.WaitAllStarted(s.services...)

started()
defer stopped() // this defer should come first so it's last out

<-ctx.Done()

startstop.StopAllParallel(s.services...)
}()

return nil
}

//go:embed public
Expand Down
Loading

0 comments on commit 33f2b33

Please sign in to comment.