Skip to content

Commit

Permalink
API-side query caching for job counts
Browse files Browse the repository at this point in the history
This one's related to trying to find a solution for #106. After messing
around with query plans a lot on the "count by state" query, I came to
the conclusion in the end that Postgres might actually be doing the
right thing by falling back to a sequential scan, or at least only the
minimally wrong thing. Even forcing the count query to run against a
well-used index is a fairly slow operation when there are many jobs in
the database. It's hard to provide specifics because caching affects the
result so much (so running the same query twice in a row can produce
vastly different timings), but I've seen the index version take _longer_
than the seq scan in some cases.

So here, I'm proposing a radically different solution in which we add
some infrastructure to the River UI API server that lets it run slow
queries periodically in the background, then have API endpoints take
advantage of those cached results instead of having to run each
operation themselves, thereby making their responses ~instant.

I've written it such that this caching only kicks in when we know we're
working with a very large data set where it actually matters (currently
defined as > 1M rows), with the idea being that for smaller databases
we'll continue to run queries in-band so that results look as fresh and
real-time as possible.

To support this, I've had to make some changes to the River UI API
server/handler so that it has a `Start` function that can be invoked to
start background utilities like the query cache. It's a considerable
change, but I think it leaves us in a more sustainable place API-wise
because we may want to add other background utilities later on, and
returning an `http.Handler` isn't enough because even if you were to
start goroutines from `NewHandler`, it's very, very not ideal that
there's no way to stop those goroutines again (problematic for anything
that wants to check for leaks with goleak).

I'm also going to propose that we increase the default API endpoint
timeout from 5 seconds to 10 seconds. When I load in 3 to 5 million job
rows, I see count queries taking right around that 3 to 5 seconds range.
Since the original number of 5 seconds was a little arbitrary anyway, it
can't hurt to give those queries a little more leeway. A problem that
could still occur even with my proposal here is that if a user starts
River UI and then immediately hits the UI, there won't be a cached
results yet, and therefore the count query will go to the database
directly, and that may still cause a timeout at 5 seconds.

I've only applied caching to the count timeout so far, but I've written
the `QueryCacher` code such that it can cleanly support other queries if
we care to add them.
  • Loading branch information
brandur committed Aug 7, 2024
1 parent 93b4f53 commit 6baf4d6
Show file tree
Hide file tree
Showing 12 changed files with 576 additions and 111 deletions.
19 changes: 13 additions & 6 deletions cmd/riverui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ func main() {
if err := godotenv.Load(); err != nil {
fmt.Printf("No .env file detected, using environment variables\n")
}
logger = slog.New(slog.NewTextHandler(os.Stdout, nil))

if os.Getenv("RIVER_DEBUG") == "1" || os.Getenv("RIVER_DEBUG") == "true" {
logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
} else {
logger = slog.New(slog.NewTextHandler(os.Stdout, nil))
}

os.Exit(initAndServe(ctx))
}

func initAndServe(ctx context.Context) int {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var pathPrefix string
flag.StringVar(&pathPrefix, "prefix", "/", "path prefix to use for the API and UI HTTP requests")
flag.Parse()
Expand Down Expand Up @@ -82,13 +84,18 @@ func initAndServe(ctx context.Context) int {
Prefix: pathPrefix,
}

handler, err := riverui.NewHandler(handlerOpts)
server, err := riverui.NewServer(handlerOpts)
if err != nil {
logger.ErrorContext(ctx, "error creating handler", slog.String("error", err.Error()))
return 1
}

logHandler := sloghttp.Recovery(handler)
if err := server.Start(ctx); err != nil {
logger.ErrorContext(ctx, "error starting UI server", slog.String("error", err.Error()))
return 1
}

logHandler := sloghttp.Recovery(server.Handler())
config := sloghttp.Config{
WithSpanID: otelEnabled,
WithTraceID: otelEnabled,
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/riverqueue/riverui

go 1.22
go 1.22.5

require (
github.com/go-playground/validator/v10 v10.22.0
Expand All @@ -11,6 +11,7 @@ require (
github.com/riverqueue/river v0.8.0
github.com/riverqueue/river/riverdriver v0.8.0
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.8.0
github.com/riverqueue/river/rivershared v0.11.1
github.com/riverqueue/river/rivertype v0.8.0
github.com/rs/cors v1.11.0
github.com/samber/slog-http v1.0.0
Expand All @@ -29,6 +30,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
go.uber.org/goleak v1.3.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sync v0.7.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ github.com/riverqueue/river/riverdriver/riverdatabasesql v0.8.0 h1:eH6kkU8qstq1R
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.8.0/go.mod h1:4jXPB30TNOWSeOvNvk1Mdov4XIMTBCnIzysrdAXizzs=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.8.0 h1:9lF2GQIU0Z5gynaY6kevJwW5ycy/VbH9S/iYu0+Lf7U=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.8.0/go.mod h1:rPTUHOdsrQIEyeEesEaBzNyj0Hs4VtXGUHHPC4JwgZ0=
github.com/riverqueue/river/rivershared v0.11.1 h1:5HDZ5fPrHf68lrE2CTTTUfRfdCmfW1G6P/v0zCvor7I=
github.com/riverqueue/river/rivershared v0.11.1/go.mod h1:2egnQ7czNcW8IXKXMRjko0aEMrQzF4V3k3jddmYiihE=
github.com/riverqueue/river/rivertype v0.8.0 h1:Ys49e1AECeIOTxRquXC446uIEPXiXLMNVKD4KwexJPM=
github.com/riverqueue/river/rivertype v0.8.0/go.mod h1:nDd50b/mIdxR/ezQzGS/JiAhBPERA7tUIne21GdfspQ=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/rs/cors v1.11.0 h1:0B9GE/r9Bc2UxRMMtymBkHTenPkHDv0CW4Y98GBY+po=
github.com/rs/cors v1.11.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/samber/slog-http v1.0.0 h1:KjxyJm2lOsuWBt904A04qvrp+0ZvOfwDnk6jI8h7/5c=
Expand Down
101 changes: 85 additions & 16 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"github.com/jackc/pgx/v5"

"github.com/riverqueue/river"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/util/randutil"
"github.com/riverqueue/riverui/internal/apiendpoint"
"github.com/riverqueue/riverui/internal/apimiddleware"
"github.com/riverqueue/riverui/internal/dbsqlc"
Expand Down Expand Up @@ -62,8 +65,14 @@ func normalizePathPrefix(prefix string) string {
return prefix
}

// NewHandler creates a new http.Handler that serves the River UI and API.
func NewHandler(opts *HandlerOpts) (http.Handler, error) {
type Server struct {
baseStartStop startstop.BaseStartStop
handler http.Handler
services []startstop.Service
}

// NewServer creates a new http.Handler that serves the River UI and API.
func NewServer(opts *HandlerOpts) (*Server, error) {
if opts == nil {
return nil, errors.New("opts is required")
}
Expand All @@ -80,6 +89,12 @@ func NewHandler(opts *HandlerOpts) (http.Handler, error) {
serveIndex := serveFileContents("index.html", httpFS)

apiBundle := apiBundle{
// TODO: Switch to baseservice.NewArchetype when available.
archetype: &baseservice.Archetype{
Logger: opts.Logger,
Rand: randutil.NewCryptoSeededConcurrentSafeRand(),
Time: &baseservice.UnStubbableTimeGenerator{},
},
client: opts.Client,
dbPool: opts.DBPool,
logger: opts.Logger,
Expand All @@ -88,19 +103,35 @@ func NewHandler(opts *HandlerOpts) (http.Handler, error) {
prefix := opts.Prefix

mux := http.NewServeMux()
apiendpoint.Mount(mux, opts.Logger, &healthCheckGetEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &jobCancelEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &jobDeleteEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &jobListEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &jobRetryEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &jobGetEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &queueGetEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &queueListEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &queuePauseEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &queueResumeEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &stateAndCountGetEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &workflowGetEndpoint{apiBundle: apiBundle})
apiendpoint.Mount(mux, opts.Logger, &workflowListEndpoint{apiBundle: apiBundle})

endpoints := []apiendpoint.EndpointInterface{
apiendpoint.Mount(mux, opts.Logger, newHealthCheckGetEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newJobCancelEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newJobDeleteEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newJobListEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newJobRetryEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newJobGetEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newQueueGetEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newQueueListEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newQueuePauseEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newQueueResumeEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newStateAndCountGetEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newWorkflowGetEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newWorkflowListEndpoint(apiBundle)),
}

var services []startstop.Service

type WithSubServices interface {
SubServices() []startstop.Service
}

// If any endpoints are start/stop services, start them up.
for _, endpoint := range endpoints {
if withSubServices, ok := endpoint.(WithSubServices); ok {
services = append(services, withSubServices.SubServices()...)
}
}

if err := mountStaticFiles(opts.Logger, mux); err != nil {
return nil, err
Expand All @@ -115,7 +146,45 @@ func NewHandler(opts *HandlerOpts) (http.Handler, error) {
middlewareStack.Use(&stripPrefixMiddleware{prefix})
}

return middlewareStack.Mount(mux), nil
server := &Server{
handler: middlewareStack.Mount(mux),
services: services,
}

return server, nil
}

// Handler returns an http.Handler that can be mounted to serve HTTP requests.
func (s *Server) Handler() http.Handler { return s.handler }

// Start starts the server's background services. Notably, this does _not_ cause
// the server to start listening for HTTP in any way. To do that, call Handler
// and mount or run it using Go's built in `net/http`.
func (s *Server) Start(ctx context.Context) error {
ctx, shouldStart, started, stopped := s.baseStartStop.StartInit(ctx)
if !shouldStart {
return nil
}

for _, service := range s.services {
if err := service.Start(ctx); err != nil {
return err
}
}

go func() {
// Wait for all subservices to start up before signaling our own start.
startstop.WaitAllStarted(s.services...)

started()
defer stopped() // this defer should come first so it's last out

<-ctx.Done()

startstop.StopAllParallel(s.services...)
}()

return nil
}

//go:embed public
Expand Down
Loading

0 comments on commit 6baf4d6

Please sign in to comment.