From 6bb7e88c25dc9d53e457422ff22a62f659295363 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Tue, 10 Sep 2024 12:17:53 -0500 Subject: [PATCH] add JobInsertManyReturning to drivers This new query allows many rows to be inserted (up to ~7280 as of now) while also allowing the new records to be returned. While this is not quite as fast as the `COPY FROM` option when loading many rows, it provides a better UX for the vast majority of use cases. It _does_ require that we ditch sqlc for this one query, because sqlc does not support the multirow values insert syntax due to the dynamic nature of the param placeholders. --- .../riverdrivertest/riverdrivertest.go | 79 +++++++++++++++- riverdriver/river_driver_interface.go | 1 + .../internal/dbsqlc/river_job.sql.go | 92 +++++++++++++++++++ .../river_database_sql_driver.go | 48 ++++++++++ .../riverpgxv5/internal/dbsqlc/river_job.sql | 28 ++++++ .../internal/dbsqlc/river_job.sql.go | 89 ++++++++++++++++++ riverdriver/riverpgxv5/river_pgx_v5_driver.go | 48 ++++++++++ rivershared/util/sliceutil/slice_util.go | 9 ++ rivershared/util/sliceutil/slice_util_test.go | 15 +++ 9 files changed, 408 insertions(+), 1 deletion(-) diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 55ef1b91..06f30c52 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -16,7 +16,7 @@ import ( "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/riverdriver" - "github.com/riverqueue/river/rivershared/testfactory" //nolint:depguard + "github.com/riverqueue/river/rivershared/testfactory" "github.com/riverqueue/river/rivershared/util/ptrutil" "github.com/riverqueue/river/rivershared/util/sliceutil" "github.com/riverqueue/river/rivertype" @@ -1152,6 +1152,83 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, }) }) + t.Run("JobInsertManyReturning", func(t *testing.T) { + t.Parallel() + + t.Run("AllArgs", func(t *testing.T) { + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + insertParams := make([]*riverdriver.JobInsertFastParams, 10) + for i := 0; i < len(insertParams); i++ { + insertParams[i] = &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{"encoded": "args"}`), + Kind: "test_kind", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Metadata: []byte(`{"meta": "data"}`), + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + ScheduledAt: ptrutil.Ptr(now.Add(time.Duration(i) * time.Minute)), + State: rivertype.JobStateAvailable, + Tags: []string{"tag"}, + } + } + + jobRows, err := exec.JobInsertManyReturning(ctx, insertParams) + require.NoError(t, err) + require.Len(t, jobRows, len(insertParams)) + + for i, job := range jobRows { + require.Equal(t, 0, job.Attempt) + require.Nil(t, job.AttemptedAt) + require.Empty(t, job.AttemptedBy) + require.WithinDuration(t, now, job.CreatedAt, 2*time.Second) + require.Equal(t, []byte(`{"encoded": "args"}`), job.EncodedArgs) + require.Empty(t, job.Errors) + require.Nil(t, job.FinalizedAt) + require.Equal(t, "test_kind", job.Kind) + require.Equal(t, rivercommon.MaxAttemptsDefault, job.MaxAttempts) + require.Equal(t, []byte(`{"meta": "data"}`), job.Metadata) + require.Equal(t, rivercommon.PriorityDefault, job.Priority) + require.Equal(t, rivercommon.QueueDefault, job.Queue) + requireEqualTime(t, now.Add(time.Duration(i)*time.Minute), job.ScheduledAt) + require.Equal(t, rivertype.JobStateAvailable, job.State) + require.Equal(t, []string{"tag"}, job.Tags) + } + }) + + t.Run("MissingScheduledAtDefaultsToNow", func(t *testing.T) { + exec, _ := setup(ctx, t) + + insertParams := make([]*riverdriver.JobInsertFastParams, 10) + for i := 0; i < len(insertParams); i++ { + insertParams[i] = &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{"encoded": "args"}`), + Kind: "test_kind", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Metadata: []byte(`{"meta": "data"}`), + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + ScheduledAt: nil, // explicit nil + State: rivertype.JobStateAvailable, + Tags: []string{"tag"}, + } + } + + results, err := exec.JobInsertManyReturning(ctx, insertParams) + require.NoError(t, err) + require.Len(t, results, len(insertParams)) + + jobsAfter, err := exec.JobGetByKindMany(ctx, []string{"test_kind"}) + require.NoError(t, err) + require.Len(t, jobsAfter, len(insertParams)) + for _, job := range jobsAfter { + require.WithinDuration(t, time.Now().UTC(), job.ScheduledAt, 2*time.Second) + } + }) + }) + t.Run("JobInsertUnique", func(t *testing.T) { t.Parallel() diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index d47b3eee..8609b450 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -118,6 +118,7 @@ type Executor interface { JobInsertFast(ctx context.Context, params *JobInsertFastParams) (*rivertype.JobRow, error) JobInsertFastMany(ctx context.Context, params []*JobInsertFastParams) (int, error) JobInsertFull(ctx context.Context, params *JobInsertFullParams) (*rivertype.JobRow, error) + JobInsertManyReturning(ctx context.Context, params []*JobInsertFastParams) ([]*rivertype.JobRow, error) JobInsertUnique(ctx context.Context, params *JobInsertUniqueParams) (*JobInsertUniqueResult, error) JobList(ctx context.Context, query string, namedArgs map[string]any) ([]*rivertype.JobRow, error) JobListFields() string diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index 61eebd1d..81de9241 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -745,6 +745,98 @@ func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFull return &i, err } +const jobInsertManyReturning = `-- name: JobInsertManyReturning :many +INSERT INTO river_job( + args, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags +) SELECT + unnest($1::jsonb[]), + unnest($2::text[]), + unnest($3::smallint[]), + unnest($4::jsonb[]), + unnest($5::smallint[]), + unnest($6::text[]), + unnest($7::timestamptz[]), + -- To avoid requiring pgx users to register the OID of the river_job_state[] + -- type, we cast the array to text[] and then to river_job_state. + unnest($8::text[])::river_job_state, + -- Unnest on a multi-dimensional array will fully flatten the array, so we + -- encode the tag list as a comma-separated string and split it in the + -- query. + string_to_array(unnest($9::text[]), ',') +RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key +` + +type JobInsertManyReturningParams struct { + Args []string + Kind []string + MaxAttempts []int16 + Metadata []string + Priority []int16 + Queue []string + ScheduledAt []time.Time + State []string + Tags []string +} + +func (q *Queries) JobInsertManyReturning(ctx context.Context, db DBTX, arg *JobInsertManyReturningParams) ([]*RiverJob, error) { + rows, err := db.QueryContext(ctx, jobInsertManyReturning, + pq.Array(arg.Args), + pq.Array(arg.Kind), + pq.Array(arg.MaxAttempts), + pq.Array(arg.Metadata), + pq.Array(arg.Priority), + pq.Array(arg.Queue), + pq.Array(arg.ScheduledAt), + pq.Array(arg.State), + pq.Array(arg.Tags), + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*RiverJob + for rows.Next() { + var i RiverJob + if err := rows.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + pq.Array(&i.AttemptedBy), + &i.CreatedAt, + pq.Array(&i.Errors), + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + pq.Array(&i.Tags), + &i.UniqueKey, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const jobInsertUnique = `-- name: JobInsertUnique :one INSERT INTO river_job( args, diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index ba7599ab..7f5d0fa1 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -286,6 +286,54 @@ func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobIns return jobRowFromInternal(job) } +func (e *Executor) JobInsertManyReturning(ctx context.Context, params []*riverdriver.JobInsertFastParams) ([]*rivertype.JobRow, error) { + insertJobsParams := &dbsqlc.JobInsertManyReturningParams{ + Args: make([]string, len(params)), + Kind: make([]string, len(params)), + MaxAttempts: make([]int16, len(params)), + Metadata: make([]string, len(params)), + Priority: make([]int16, len(params)), + Queue: make([]string, len(params)), + ScheduledAt: make([]time.Time, len(params)), + State: make([]string, len(params)), + Tags: make([]string, len(params)), + } + now := time.Now() + + for i := 0; i < len(params); i++ { + params := params[i] + + scheduledAt := now + if params.ScheduledAt != nil { + scheduledAt = *params.ScheduledAt + } + + tags := params.Tags + if tags == nil { + tags = []string{} + } + + defaultObject := "{}" + + insertJobsParams.Args[i] = valutil.ValOrDefault(string(params.EncodedArgs), defaultObject) + insertJobsParams.Kind[i] = params.Kind + insertJobsParams.MaxAttempts[i] = int16(min(params.MaxAttempts, math.MaxInt16)) //nolint:gosec + insertJobsParams.Metadata[i] = valutil.ValOrDefault(string(params.Metadata), defaultObject) + insertJobsParams.Priority[i] = int16(min(params.Priority, math.MaxInt16)) //nolint:gosec + insertJobsParams.Queue[i] = params.Queue + insertJobsParams.ScheduledAt[i] = scheduledAt + insertJobsParams.State[i] = string(params.State) + insertJobsParams.Tags[i] = strings.Join(tags, ",") + } + + items, err := dbsqlc.New().JobInsertManyReturning(ctx, e.dbtx, insertJobsParams) + if err != nil { + return nil, interpretError(err) + } + + return mapSliceError(items, jobRowFromInternal) +} + func (e *Executor) JobInsertUnique(ctx context.Context, params *riverdriver.JobInsertUniqueParams) (*riverdriver.JobInsertUniqueResult, error) { insertRes, err := dbsqlc.New().JobInsertUnique(ctx, e.dbtx, &dbsqlc.JobInsertUniqueParams{ Args: string(params.EncodedArgs), diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index ff3decf2..df20035d 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -285,6 +285,34 @@ INSERT INTO river_job( @unique_key ) RETURNING *; +-- name: JobInsertManyReturning :many +INSERT INTO river_job( + args, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags +) SELECT + unnest(@args::jsonb[]), + unnest(@kind::text[]), + unnest(@max_attempts::smallint[]), + unnest(@metadata::jsonb[]), + unnest(@priority::smallint[]), + unnest(@queue::text[]), + unnest(@scheduled_at::timestamptz[]), + -- To avoid requiring pgx users to register the OID of the river_job_state[] + -- type, we cast the array to text[] and then to river_job_state. + unnest(@state::text[])::river_job_state, + -- Unnest on a multi-dimensional array will fully flatten the array, so we + -- encode the tag list as a comma-separated string and split it in the + -- query. + string_to_array(unnest(@tags::text[]), ',') +RETURNING *; + -- name: JobInsertUnique :one INSERT INTO river_job( args, diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 5beabbe6..55138fe0 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -731,6 +731,95 @@ func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFull return &i, err } +const jobInsertManyReturning = `-- name: JobInsertManyReturning :many +INSERT INTO river_job( + args, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags +) SELECT + unnest($1::jsonb[]), + unnest($2::text[]), + unnest($3::smallint[]), + unnest($4::jsonb[]), + unnest($5::smallint[]), + unnest($6::text[]), + unnest($7::timestamptz[]), + -- To avoid requiring pgx users to register the OID of the river_job_state[] + -- type, we cast the array to text[] and then to river_job_state. + unnest($8::text[])::river_job_state, + -- Unnest on a multi-dimensional array will fully flatten the array, so we + -- encode the tag list as a comma-separated string and split it in the + -- query. + string_to_array(unnest($9::text[]), ',') +RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key +` + +type JobInsertManyReturningParams struct { + Args [][]byte + Kind []string + MaxAttempts []int16 + Metadata [][]byte + Priority []int16 + Queue []string + ScheduledAt []time.Time + State []string + Tags []string +} + +func (q *Queries) JobInsertManyReturning(ctx context.Context, db DBTX, arg *JobInsertManyReturningParams) ([]*RiverJob, error) { + rows, err := db.Query(ctx, jobInsertManyReturning, + arg.Args, + arg.Kind, + arg.MaxAttempts, + arg.Metadata, + arg.Priority, + arg.Queue, + arg.ScheduledAt, + arg.State, + arg.Tags, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*RiverJob + for rows.Next() { + var i RiverJob + if err := rows.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + &i.AttemptedBy, + &i.CreatedAt, + &i.Errors, + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + &i.Tags, + &i.UniqueKey, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const jobInsertUnique = `-- name: JobInsertUnique :one INSERT INTO river_job( args, diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 36da4d81..b60e9671 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -278,6 +278,54 @@ func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobIns return jobRowFromInternal(job) } +func (e *Executor) JobInsertManyReturning(ctx context.Context, params []*riverdriver.JobInsertFastParams) ([]*rivertype.JobRow, error) { + insertJobsParams := &dbsqlc.JobInsertManyReturningParams{ + Args: make([][]byte, len(params)), + Kind: make([]string, len(params)), + MaxAttempts: make([]int16, len(params)), + Metadata: make([][]byte, len(params)), + Priority: make([]int16, len(params)), + Queue: make([]string, len(params)), + ScheduledAt: make([]time.Time, len(params)), + State: make([]string, len(params)), + Tags: make([]string, len(params)), + } + now := time.Now() + + for i := 0; i < len(params); i++ { + params := params[i] + + scheduledAt := now + if params.ScheduledAt != nil { + scheduledAt = *params.ScheduledAt + } + + tags := params.Tags + if tags == nil { + tags = []string{} + } + + defaultObject := []byte("{}") + + insertJobsParams.Args[i] = sliceutil.DefaultIfEmpty(params.EncodedArgs, defaultObject) + insertJobsParams.Kind[i] = params.Kind + insertJobsParams.MaxAttempts[i] = int16(min(params.MaxAttempts, math.MaxInt16)) //nolint:gosec + insertJobsParams.Metadata[i] = sliceutil.DefaultIfEmpty(params.Metadata, defaultObject) + insertJobsParams.Priority[i] = int16(min(params.Priority, math.MaxInt16)) //nolint:gosec + insertJobsParams.Queue[i] = params.Queue + insertJobsParams.ScheduledAt[i] = scheduledAt + insertJobsParams.State[i] = string(params.State) + insertJobsParams.Tags[i] = strings.Join(tags, ",") + } + + items, err := dbsqlc.New().JobInsertManyReturning(ctx, e.dbtx, insertJobsParams) + if err != nil { + return nil, interpretError(err) + } + + return mapSliceError(items, jobRowFromInternal) +} + func (e *Executor) JobInsertUnique(ctx context.Context, params *riverdriver.JobInsertUniqueParams) (*riverdriver.JobInsertUniqueResult, error) { insertRes, err := dbsqlc.New().JobInsertUnique(ctx, e.dbtx, &dbsqlc.JobInsertUniqueParams{ Args: params.EncodedArgs, diff --git a/rivershared/util/sliceutil/slice_util.go b/rivershared/util/sliceutil/slice_util.go index 400c9d40..8088e05a 100644 --- a/rivershared/util/sliceutil/slice_util.go +++ b/rivershared/util/sliceutil/slice_util.go @@ -4,6 +4,15 @@ // therefore omitted from the utilities in `slices`. package sliceutil +// DefaultIfEmpty returns the default slice if the input slice is nil or empty, +// otherwise it returns the input slice. +func DefaultIfEmpty[T any](input []T, defaultSlice []T) []T { + if len(input) == 0 { + return defaultSlice + } + return input +} + // GroupBy returns an object composed of keys generated from the results of // running each element of collection through keyFunc. func GroupBy[T any, U comparable](collection []T, keyFunc func(T) U) map[U][]T { diff --git a/rivershared/util/sliceutil/slice_util_test.go b/rivershared/util/sliceutil/slice_util_test.go index 4b2bc49c..4eb57e56 100644 --- a/rivershared/util/sliceutil/slice_util_test.go +++ b/rivershared/util/sliceutil/slice_util_test.go @@ -8,6 +8,21 @@ import ( "github.com/stretchr/testify/require" ) +func TestDefaultIfEmpty(t *testing.T) { + t.Parallel() + + result1 := DefaultIfEmpty([]int{1, 2, 3}, []int{4, 5, 6}) + result2 := DefaultIfEmpty([]int{}, []int{4, 5, 6}) + result3 := DefaultIfEmpty(nil, []int{4, 5, 6}) + + require.Len(t, result1, 3) + require.Len(t, result2, 3) + require.Len(t, result3, 3) + require.Equal(t, []int{1, 2, 3}, result1) + require.Equal(t, []int{4, 5, 6}, result2) + require.Equal(t, []int{4, 5, 6}, result3) +} + func TestGroupBy(t *testing.T) { t.Parallel()