Skip to content

Commit

Permalink
add JobInsertManyReturning to drivers
Browse files Browse the repository at this point in the history
This new query allows many rows to be inserted (up to ~7280 as of now)
while also allowing the new records to be returned. While this is not
quite as fast as the `COPY FROM` option when loading many rows, it
provides a better UX for the vast majority of use cases.

It _does_ require that we ditch sqlc for this one query, because sqlc
does not support the multirow values insert syntax due to the dynamic
nature of the param placeholders.
  • Loading branch information
bgentry committed Sep 12, 2024
1 parent 7d98386 commit 6bb7e88
Show file tree
Hide file tree
Showing 9 changed files with 408 additions and 1 deletion.
79 changes: 78 additions & 1 deletion internal/riverinternaltest/riverdrivertest/riverdrivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/riverqueue/river/internal/notifier"
"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/testfactory" //nolint:depguard
"github.com/riverqueue/river/rivershared/testfactory"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivershared/util/sliceutil"
"github.com/riverqueue/river/rivertype"
Expand Down Expand Up @@ -1152,6 +1152,83 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
})
})

t.Run("JobInsertManyReturning", func(t *testing.T) {
t.Parallel()

t.Run("AllArgs", func(t *testing.T) {
exec, _ := setup(ctx, t)

now := time.Now().UTC()

insertParams := make([]*riverdriver.JobInsertFastParams, 10)
for i := 0; i < len(insertParams); i++ {
insertParams[i] = &riverdriver.JobInsertFastParams{
EncodedArgs: []byte(`{"encoded": "args"}`),
Kind: "test_kind",
MaxAttempts: rivercommon.MaxAttemptsDefault,
Metadata: []byte(`{"meta": "data"}`),
Priority: rivercommon.PriorityDefault,
Queue: rivercommon.QueueDefault,
ScheduledAt: ptrutil.Ptr(now.Add(time.Duration(i) * time.Minute)),
State: rivertype.JobStateAvailable,
Tags: []string{"tag"},
}
}

jobRows, err := exec.JobInsertManyReturning(ctx, insertParams)
require.NoError(t, err)
require.Len(t, jobRows, len(insertParams))

for i, job := range jobRows {
require.Equal(t, 0, job.Attempt)
require.Nil(t, job.AttemptedAt)
require.Empty(t, job.AttemptedBy)
require.WithinDuration(t, now, job.CreatedAt, 2*time.Second)
require.Equal(t, []byte(`{"encoded": "args"}`), job.EncodedArgs)
require.Empty(t, job.Errors)
require.Nil(t, job.FinalizedAt)
require.Equal(t, "test_kind", job.Kind)
require.Equal(t, rivercommon.MaxAttemptsDefault, job.MaxAttempts)
require.Equal(t, []byte(`{"meta": "data"}`), job.Metadata)
require.Equal(t, rivercommon.PriorityDefault, job.Priority)
require.Equal(t, rivercommon.QueueDefault, job.Queue)
requireEqualTime(t, now.Add(time.Duration(i)*time.Minute), job.ScheduledAt)
require.Equal(t, rivertype.JobStateAvailable, job.State)
require.Equal(t, []string{"tag"}, job.Tags)
}
})

t.Run("MissingScheduledAtDefaultsToNow", func(t *testing.T) {
exec, _ := setup(ctx, t)

insertParams := make([]*riverdriver.JobInsertFastParams, 10)
for i := 0; i < len(insertParams); i++ {
insertParams[i] = &riverdriver.JobInsertFastParams{
EncodedArgs: []byte(`{"encoded": "args"}`),
Kind: "test_kind",
MaxAttempts: rivercommon.MaxAttemptsDefault,
Metadata: []byte(`{"meta": "data"}`),
Priority: rivercommon.PriorityDefault,
Queue: rivercommon.QueueDefault,
ScheduledAt: nil, // explicit nil
State: rivertype.JobStateAvailable,
Tags: []string{"tag"},
}
}

results, err := exec.JobInsertManyReturning(ctx, insertParams)
require.NoError(t, err)
require.Len(t, results, len(insertParams))

jobsAfter, err := exec.JobGetByKindMany(ctx, []string{"test_kind"})
require.NoError(t, err)
require.Len(t, jobsAfter, len(insertParams))
for _, job := range jobsAfter {
require.WithinDuration(t, time.Now().UTC(), job.ScheduledAt, 2*time.Second)
}
})
})

t.Run("JobInsertUnique", func(t *testing.T) {
t.Parallel()

Expand Down
1 change: 1 addition & 0 deletions riverdriver/river_driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type Executor interface {
JobInsertFast(ctx context.Context, params *JobInsertFastParams) (*rivertype.JobRow, error)
JobInsertFastMany(ctx context.Context, params []*JobInsertFastParams) (int, error)
JobInsertFull(ctx context.Context, params *JobInsertFullParams) (*rivertype.JobRow, error)
JobInsertManyReturning(ctx context.Context, params []*JobInsertFastParams) ([]*rivertype.JobRow, error)
JobInsertUnique(ctx context.Context, params *JobInsertUniqueParams) (*JobInsertUniqueResult, error)
JobList(ctx context.Context, query string, namedArgs map[string]any) ([]*rivertype.JobRow, error)
JobListFields() string
Expand Down
92 changes: 92 additions & 0 deletions riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 48 additions & 0 deletions riverdriver/riverdatabasesql/river_database_sql_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,54 @@ func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobIns
return jobRowFromInternal(job)
}

func (e *Executor) JobInsertManyReturning(ctx context.Context, params []*riverdriver.JobInsertFastParams) ([]*rivertype.JobRow, error) {
insertJobsParams := &dbsqlc.JobInsertManyReturningParams{
Args: make([]string, len(params)),
Kind: make([]string, len(params)),
MaxAttempts: make([]int16, len(params)),
Metadata: make([]string, len(params)),
Priority: make([]int16, len(params)),
Queue: make([]string, len(params)),
ScheduledAt: make([]time.Time, len(params)),
State: make([]string, len(params)),
Tags: make([]string, len(params)),
}
now := time.Now()

for i := 0; i < len(params); i++ {
params := params[i]

scheduledAt := now
if params.ScheduledAt != nil {
scheduledAt = *params.ScheduledAt
}

tags := params.Tags
if tags == nil {
tags = []string{}
}

defaultObject := "{}"

insertJobsParams.Args[i] = valutil.ValOrDefault(string(params.EncodedArgs), defaultObject)
insertJobsParams.Kind[i] = params.Kind
insertJobsParams.MaxAttempts[i] = int16(min(params.MaxAttempts, math.MaxInt16)) //nolint:gosec
insertJobsParams.Metadata[i] = valutil.ValOrDefault(string(params.Metadata), defaultObject)
insertJobsParams.Priority[i] = int16(min(params.Priority, math.MaxInt16)) //nolint:gosec
insertJobsParams.Queue[i] = params.Queue
insertJobsParams.ScheduledAt[i] = scheduledAt
insertJobsParams.State[i] = string(params.State)
insertJobsParams.Tags[i] = strings.Join(tags, ",")
}

items, err := dbsqlc.New().JobInsertManyReturning(ctx, e.dbtx, insertJobsParams)
if err != nil {
return nil, interpretError(err)
}

return mapSliceError(items, jobRowFromInternal)
}

func (e *Executor) JobInsertUnique(ctx context.Context, params *riverdriver.JobInsertUniqueParams) (*riverdriver.JobInsertUniqueResult, error) {
insertRes, err := dbsqlc.New().JobInsertUnique(ctx, e.dbtx, &dbsqlc.JobInsertUniqueParams{
Args: string(params.EncodedArgs),
Expand Down
28 changes: 28 additions & 0 deletions riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,34 @@ INSERT INTO river_job(
@unique_key
) RETURNING *;

-- name: JobInsertManyReturning :many
INSERT INTO river_job(
args,
kind,
max_attempts,
metadata,
priority,
queue,
scheduled_at,
state,
tags
) SELECT
unnest(@args::jsonb[]),
unnest(@kind::text[]),
unnest(@max_attempts::smallint[]),
unnest(@metadata::jsonb[]),
unnest(@priority::smallint[]),
unnest(@queue::text[]),
unnest(@scheduled_at::timestamptz[]),
-- To avoid requiring pgx users to register the OID of the river_job_state[]
-- type, we cast the array to text[] and then to river_job_state.
unnest(@state::text[])::river_job_state,
-- Unnest on a multi-dimensional array will fully flatten the array, so we
-- encode the tag list as a comma-separated string and split it in the
-- query.
string_to_array(unnest(@tags::text[]), ',')
RETURNING *;

-- name: JobInsertUnique :one
INSERT INTO river_job(
args,
Expand Down
89 changes: 89 additions & 0 deletions riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 6bb7e88

Please sign in to comment.