From 772eba515c8506909c70ca2cbfd3acec3de6f187 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Tue, 10 Sep 2024 15:17:19 -0500 Subject: [PATCH] new InsertMany/InsertManyTx with return values This adds new implementations of `InsertMany` / `InsertManyTx` that use the multirow `VALUES` syntax to allow the new rows to be returned upon insert. The alternative `COPY FROM ` implementation has been renamed to `InsertManyFast` / `InsertManyFastTx`. The expectation is that these forms will only be needed in cases where an extremely large number of records is being inserted simultaneously, whereas the new form is more user-friendly for the vast majority of other cases. --- client.go | 136 +++++++++++ client_test.go | 450 +++++++++++++++++++++++++++++++++++ example_batch_insert_test.go | 4 +- rivertest/rivertest_test.go | 14 +- 4 files changed, 595 insertions(+), 9 deletions(-) diff --git a/client.go b/client.go index a73d35fd..38892ac9 100644 --- a/client.go +++ b/client.go @@ -1326,6 +1326,142 @@ type InsertManyParams struct { InsertOpts *InsertOpts } +// InsertMany inserts many jobs at once. Each job is inserted as an +// InsertManyParams tuple, which takes job args along with an optional set of +// insert options, which override insert options provided by an +// JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. +// The provided context is used for the underlying Postgres inserts and can be +// used to cancel the operation or apply a timeout. +// +// count, err := client.InsertMany(ctx, []river.InsertManyParams{ +// {Args: BatchInsertArgs{}}, +// {Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 3}}, +// }) +// if err != nil { +// // handle error +// } +// +// Job uniqueness is not respected when using InsertMany due to unique inserts +// using an internal transaction and advisory lock that might lead to +// significant lock contention. Insert unique jobs using Insert instead. +// +// Job uniqueness is not respected when using InsertMany due to unique inserts +// using an internal transaction and advisory lock that might lead to +// significant lock contention. Insert unique jobs using Insert instead. +func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) { + if !c.driver.HasPool() { + return nil, errNoDriverDBPool + } + + insertParams, err := c.insertManyParams(params) + if err != nil { + return nil, err + } + + // Wrap in a transaction in case we need to notify about inserts. + tx, err := c.driver.GetExecutor().Begin(ctx) + if err != nil { + return nil, err + } + defer tx.Rollback(ctx) + + inserted, err := c.insertMany(ctx, tx, insertParams) + if err != nil { + return nil, err + } + if err := tx.Commit(ctx); err != nil { + return nil, err + } + return inserted, nil +} + +// InsertManyTx inserts many jobs at once. Each job is inserted as an +// InsertManyParams tuple, which takes job args along with an optional set of +// insert options, which override insert options provided by an +// JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. +// The provided context is used for the underlying Postgres inserts and can be +// used to cancel the operation or apply a timeout. +// +// count, err := client.InsertManyTx(ctx, tx, []river.InsertManyParams{ +// {Args: BatchInsertArgs{}}, +// {Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 3}}, +// }) +// if err != nil { +// // handle error +// } +// +// Job uniqueness is not respected when using InsertMany due to unique inserts +// using an internal transaction and advisory lock that might lead to +// significant lock contention. Insert unique jobs using Insert instead. +// +// This variant lets a caller insert jobs atomically alongside other database +// changes. An inserted job isn't visible to be worked until the transaction +// commits, and if the transaction rolls back, so too is the inserted job. +func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) { + insertParams, err := c.insertManyParams(params) + if err != nil { + return nil, err + } + + exec := c.driver.UnwrapExecutor(tx) + return c.insertMany(ctx, exec, insertParams) +} + +func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) { + jobRows, err := tx.JobInsertManyReturning(ctx, insertParams) + if err != nil { + return nil, err + } + + queues := make([]string, 0, 10) + for _, params := range insertParams { + if params.State == rivertype.JobStateAvailable { + queues = append(queues, params.Queue) + } + } + if err := c.maybeNotifyInsertForQueues(ctx, tx, queues); err != nil { + return nil, err + } + + return sliceutil.Map(jobRows, + func(jobRow *rivertype.JobRow) *rivertype.JobInsertResult { + return &rivertype.JobInsertResult{Job: jobRow} + }, + ), nil +} + +// Validates input parameters for an a batch insert operation and generates a +// set of batch insert parameters. +func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdriver.JobInsertFastParams, error) { + if len(params) < 1 { + return nil, errors.New("no jobs to insert") + } + + insertParams := make([]*riverdriver.JobInsertFastParams, len(params)) + for i, param := range params { + if err := c.validateJobArgs(param.Args); err != nil { + return nil, err + } + + if param.InsertOpts != nil { + // UniqueOpts aren't support for batch inserts because they use PG + // advisory locks to work, and taking many locks simultaneously + // could easily lead to contention and deadlocks. + if !param.InsertOpts.UniqueOpts.isEmpty() { + return nil, errors.New("UniqueOpts are not supported for batch inserts") + } + } + + var err error + insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts) + if err != nil { + return nil, err + } + } + + return insertParams, nil +} + // InsertManyFast inserts many jobs at once using Postgres' `COPY FROM` mechanism, // making the operation quite fast and memory efficient. Each job is inserted as // an InsertManyParams tuple, which takes job args along with an optional set of diff --git a/client_test.go b/client_test.go index 993da3b0..e38cbd24 100644 --- a/client_test.go +++ b/client_test.go @@ -1938,6 +1938,456 @@ func Test_Client_InsertManyFastTx(t *testing.T) { }) } +func Test_Client_InsertMany(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + dbPool *pgxpool.Pool + } + + setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) { + t.Helper() + + dbPool := riverinternaltest.TestDB(ctx, t) + config := newTestConfig(t, nil) + client := newTestClient(t, dbPool, config) + + return client, &testBundle{dbPool: dbPool} + } + + t.Run("SucceedsWithMultipleJobs", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + now := time.Now().UTC() + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: noOpArgs{Name: "Foo"}, InsertOpts: &InsertOpts{Metadata: []byte(`{"a": "b"}`), Queue: "foo", Priority: 2}}, + {Args: noOpArgs{}, InsertOpts: &InsertOpts{ScheduledAt: now.Add(time.Minute)}}, + }) + require.NoError(t, err) + require.Len(t, results, 2) + + require.False(t, results[0].UniqueSkippedAsDuplicate) + require.Equal(t, 0, results[0].Job.Attempt) + require.Nil(t, results[0].Job.AttemptedAt) + require.WithinDuration(t, now, results[0].Job.CreatedAt, 2*time.Second) + require.Empty(t, results[0].Job.AttemptedBy) + require.Positive(t, results[0].Job.ID) + require.JSONEq(t, `{"name": "Foo"}`, string(results[0].Job.EncodedArgs)) + require.Empty(t, results[0].Job.Errors) + require.Nil(t, results[0].Job.FinalizedAt) + require.Equal(t, "noOp", results[0].Job.Kind) + require.Equal(t, 25, results[0].Job.MaxAttempts) + require.JSONEq(t, `{"a": "b"}`, string(results[0].Job.Metadata)) + require.Equal(t, 2, results[0].Job.Priority) + require.Equal(t, "foo", results[0].Job.Queue) + require.WithinDuration(t, now, results[0].Job.ScheduledAt, 2*time.Second) + require.Equal(t, rivertype.JobStateAvailable, results[0].Job.State) + require.Empty(t, results[0].Job.Tags) + require.Empty(t, results[0].Job.UniqueKey) + + require.False(t, results[1].UniqueSkippedAsDuplicate) + require.Equal(t, 0, results[1].Job.Attempt) + require.Nil(t, results[1].Job.AttemptedAt) + require.WithinDuration(t, now, results[1].Job.CreatedAt, 2*time.Second) + require.Empty(t, results[1].Job.AttemptedBy) + require.Positive(t, results[1].Job.ID) + require.JSONEq(t, `{"name": ""}`, string(results[1].Job.EncodedArgs)) + require.Empty(t, results[1].Job.Errors) + require.Nil(t, results[1].Job.FinalizedAt) + require.Equal(t, "noOp", results[1].Job.Kind) + require.Equal(t, 25, results[1].Job.MaxAttempts) + require.JSONEq(t, `{}`, string(results[1].Job.Metadata)) + require.Equal(t, 1, results[1].Job.Priority) + require.Equal(t, "default", results[1].Job.Queue) + require.WithinDuration(t, now.Add(time.Minute), results[1].Job.ScheduledAt, time.Millisecond) + require.Equal(t, rivertype.JobStateScheduled, results[1].Job.State) + require.Empty(t, results[1].Job.Tags) + require.Empty(t, results[1].Job.UniqueKey) + + require.NotEqual(t, results[0].Job.ID, results[1].Job.ID) + + jobs, err := client.driver.GetExecutor().JobGetByKindMany(ctx, []string{(noOpArgs{}).Kind()}) + require.NoError(t, err) + require.Len(t, jobs, 2, "Expected to find exactly two jobs of kind: "+(noOpArgs{}).Kind()) + }) + + t.Run("TriggersImmediateWork", func(t *testing.T) { + t.Parallel() + + ctx := context.Background() + _, bundle := setup(t) + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + t.Cleanup(cancel) + + doneCh := make(chan struct{}) + close(doneCh) // don't need to block any jobs from completing + startedCh := make(chan int64) + + config := newTestConfig(t, makeAwaitCallback(startedCh, doneCh)) + config.FetchCooldown = 20 * time.Millisecond + config.FetchPollInterval = 20 * time.Second // essentially disable polling + config.Queues = map[string]QueueConfig{QueueDefault: {MaxWorkers: 2}, "another_queue": {MaxWorkers: 1}} + + client := newTestClient(t, bundle.dbPool, config) + + startClient(ctx, t, client) + riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: callbackArgs{}}, + {Args: callbackArgs{}}, + }) + require.NoError(t, err) + require.Len(t, results, 2) + + // Wait for the client to be ready by waiting for a job to be executed: + riversharedtest.WaitOrTimeoutN(t, startedCh, 2) + + // Now that we've run one job, we shouldn't take longer than the cooldown to + // fetch another after insertion. LISTEN/NOTIFY should ensure we find out + // about the inserted job much faster than the poll interval. + // + // Note: we specifically use a different queue to ensure that the notify + // limiter is immediately to fire on this queue. + results, err = client.InsertMany(ctx, []InsertManyParams{ + {Args: callbackArgs{}, InsertOpts: &InsertOpts{Queue: "another_queue"}}, + }) + require.NoError(t, err) + require.Len(t, results, 1) + + select { + case <-startedCh: + // As long as this is meaningfully shorter than the poll interval, we can be + // sure the re-fetch came from listen/notify. + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for another_queue job to start") + } + + require.NoError(t, client.Stop(ctx)) + }) + + t.Run("DoesNotTriggerInsertNotificationForNonAvailableJob", func(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + _, bundle := setup(t) + + config := newTestConfig(t, nil) + config.FetchCooldown = 5 * time.Second + config.FetchPollInterval = 5 * time.Second + client := newTestClient(t, bundle.dbPool, config) + + startClient(ctx, t, client) + riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "a", ScheduledAt: time.Now().Add(1 * time.Hour)}}, + {Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "b"}}, + }) + require.NoError(t, err) + require.Len(t, results, 2) + + // Queue `a` should be "due" to be triggered because it wasn't triggered above. + require.True(t, client.insertNotifyLimiter.ShouldTrigger("a")) + // Queue `b` should *not* be "due" to be triggered because it was triggered above. + require.False(t, client.insertNotifyLimiter.ShouldTrigger("b")) + + require.NoError(t, client.Stop(ctx)) + }) + + t.Run("WithInsertOptsScheduledAtZeroTime", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: &noOpArgs{}, InsertOpts: &InsertOpts{ScheduledAt: time.Time{}}}, + }) + require.NoError(t, err) + require.Len(t, results, 1) + + jobs, err := client.driver.GetExecutor().JobGetByKindMany(ctx, []string{(noOpArgs{}).Kind()}) + require.NoError(t, err) + require.Len(t, jobs, 1, "Expected to find exactly one job of kind: "+(noOpArgs{}).Kind()) + jobRow := jobs[0] + require.WithinDuration(t, time.Now(), jobRow.ScheduledAt, 2*time.Second) + }) + + t.Run("ErrorsOnInvalidQueueName", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: &noOpArgs{}, InsertOpts: &InsertOpts{Queue: "invalid*queue"}}, + }) + require.ErrorContains(t, err, "queue name is invalid") + require.Nil(t, results) + }) + + t.Run("ErrorsOnDriverWithoutPool", func(t *testing.T) { + t.Parallel() + + _, _ = setup(t) + + client, err := NewClient(riverpgxv5.New(nil), &Config{ + Logger: riversharedtest.Logger(t), + }) + require.NoError(t, err) + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: noOpArgs{}}, + }) + require.ErrorIs(t, err, errNoDriverDBPool) + require.Nil(t, results) + }) + + t.Run("ErrorsWithZeroJobs", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + results, err := client.InsertMany(ctx, []InsertManyParams{}) + require.EqualError(t, err, "no jobs to insert") + require.Nil(t, results) + }) + + t.Run("ErrorsOnUnknownJobKindWithWorkers", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: unregisteredJobArgs{}}, + }) + var unknownJobKindErr *UnknownJobKindError + require.ErrorAs(t, err, &unknownJobKindErr) + require.Equal(t, (&unregisteredJobArgs{}).Kind(), unknownJobKindErr.Kind) + require.Nil(t, results) + }) + + t.Run("AllowsUnknownJobKindWithoutWorkers", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + client.config.Workers = nil + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: unregisteredJobArgs{}}, + }) + require.NoError(t, err) + require.Len(t, results, 1) + }) + + t.Run("ErrorsOnInsertOptsUniqueOpts", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: noOpArgs{}, InsertOpts: &InsertOpts{UniqueOpts: UniqueOpts{ByArgs: true}}}, + }) + require.EqualError(t, err, "UniqueOpts are not supported for batch inserts") + require.Empty(t, results) + }) +} + +func Test_Client_InsertManyTx(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + tx pgx.Tx + } + + setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) { + t.Helper() + + dbPool := riverinternaltest.TestDB(ctx, t) + config := newTestConfig(t, nil) + client := newTestClient(t, dbPool, config) + + tx, err := dbPool.Begin(ctx) + require.NoError(t, err) + t.Cleanup(func() { tx.Rollback(ctx) }) + + return client, &testBundle{ + tx: tx, + } + } + + t.Run("SucceedsWithMultipleJobs", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + now := time.Now().UTC() + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + {Args: noOpArgs{Name: "Foo"}, InsertOpts: &InsertOpts{Metadata: []byte(`{"a": "b"}`), Queue: "foo", Priority: 2}}, + {Args: noOpArgs{}, InsertOpts: &InsertOpts{ScheduledAt: now.Add(time.Minute)}}, + }) + require.NoError(t, err) + require.Len(t, results, 2) + + require.False(t, results[0].UniqueSkippedAsDuplicate) + require.Equal(t, 0, results[0].Job.Attempt) + require.Nil(t, results[0].Job.AttemptedAt) + require.WithinDuration(t, now, results[0].Job.CreatedAt, 2*time.Second) + require.Empty(t, results[0].Job.AttemptedBy) + require.Positive(t, results[0].Job.ID) + require.JSONEq(t, `{"name": "Foo"}`, string(results[0].Job.EncodedArgs)) + require.Empty(t, results[0].Job.Errors) + require.Nil(t, results[0].Job.FinalizedAt) + require.Equal(t, "noOp", results[0].Job.Kind) + require.Equal(t, 25, results[0].Job.MaxAttempts) + require.JSONEq(t, `{"a": "b"}`, string(results[0].Job.Metadata)) + require.Equal(t, 2, results[0].Job.Priority) + require.Equal(t, "foo", results[0].Job.Queue) + require.WithinDuration(t, now, results[0].Job.ScheduledAt, 2*time.Second) + require.Equal(t, rivertype.JobStateAvailable, results[0].Job.State) + require.Empty(t, results[0].Job.Tags) + require.Empty(t, results[0].Job.UniqueKey) + + require.False(t, results[1].UniqueSkippedAsDuplicate) + require.Equal(t, 0, results[1].Job.Attempt) + require.Nil(t, results[1].Job.AttemptedAt) + require.WithinDuration(t, now, results[1].Job.CreatedAt, 2*time.Second) + require.Empty(t, results[1].Job.AttemptedBy) + require.Positive(t, results[1].Job.ID) + require.JSONEq(t, `{"name": ""}`, string(results[1].Job.EncodedArgs)) + require.Empty(t, results[1].Job.Errors) + require.Nil(t, results[1].Job.FinalizedAt) + require.Equal(t, "noOp", results[1].Job.Kind) + require.Equal(t, 25, results[1].Job.MaxAttempts) + require.JSONEq(t, `{}`, string(results[1].Job.Metadata)) + require.Equal(t, 1, results[1].Job.Priority) + require.Equal(t, "default", results[1].Job.Queue) + require.WithinDuration(t, now.Add(time.Minute), results[1].Job.ScheduledAt, time.Millisecond) + require.Equal(t, rivertype.JobStateScheduled, results[1].Job.State) + require.Empty(t, results[1].Job.Tags) + require.Empty(t, results[1].Job.UniqueKey) + + require.NotEqual(t, results[0].Job.ID, results[1].Job.ID) + + jobs, err := client.driver.UnwrapExecutor(bundle.tx).JobGetByKindMany(ctx, []string{(noOpArgs{}).Kind()}) + require.NoError(t, err) + require.Len(t, jobs, 2, "Expected to find exactly two jobs of kind: "+(noOpArgs{}).Kind()) + }) + + t.Run("SetsScheduledAtToNowByDefault", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, nil}}) + require.NoError(t, err) + require.Len(t, results, 1) + + require.Equal(t, rivertype.JobStateAvailable, results[0].Job.State) + require.WithinDuration(t, time.Now(), results[0].Job.ScheduledAt, 2*time.Second) + + insertedJobs, err := client.driver.UnwrapExecutor(bundle.tx).JobGetByKindMany(ctx, []string{(noOpArgs{}).Kind()}) + require.NoError(t, err) + require.Len(t, insertedJobs, 1) + require.Equal(t, rivertype.JobStateAvailable, insertedJobs[0].State) + require.WithinDuration(t, time.Now(), insertedJobs[0].ScheduledAt, 2*time.Second) + }) + + t.Run("SupportsScheduledJobs", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + startClient(ctx, t, client) + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, &InsertOpts{ScheduledAt: time.Now().Add(time.Minute)}}}) + require.NoError(t, err) + require.Len(t, results, 1) + + require.Equal(t, rivertype.JobStateScheduled, results[0].Job.State) + require.WithinDuration(t, time.Now().Add(time.Minute), results[0].Job.ScheduledAt, 2*time.Second) + }) + + // A client's allowed to send nil to their driver so they can, for example, + // easily use test transactions in their test suite. + t.Run("WithDriverWithoutPool", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t) + + client, err := NewClient(riverpgxv5.New(nil), &Config{ + Logger: riversharedtest.Logger(t), + }) + require.NoError(t, err) + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + {Args: noOpArgs{}}, + }) + require.NoError(t, err) + require.Len(t, results, 1) + }) + + t.Run("ErrorsWithZeroJobs", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{}) + require.EqualError(t, err, "no jobs to insert") + require.Nil(t, results) + }) + + t.Run("ErrorsOnUnknownJobKindWithWorkers", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + {Args: unregisteredJobArgs{}}, + }) + var unknownJobKindErr *UnknownJobKindError + require.ErrorAs(t, err, &unknownJobKindErr) + require.Equal(t, (&unregisteredJobArgs{}).Kind(), unknownJobKindErr.Kind) + require.Nil(t, results) + }) + + t.Run("AllowsUnknownJobKindWithoutWorkers", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + client.config.Workers = nil + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + {Args: unregisteredJobArgs{}}, + }) + require.NoError(t, err) + require.Len(t, results, 1) + }) + + t.Run("ErrorsOnInsertOptsUniqueOpts", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + {Args: noOpArgs{}, InsertOpts: &InsertOpts{UniqueOpts: UniqueOpts{ByArgs: true}}}, + }) + require.EqualError(t, err, "UniqueOpts are not supported for batch inserts") + require.Empty(t, results) + }) +} + func Test_Client_JobGet(t *testing.T) { t.Parallel() diff --git a/example_batch_insert_test.go b/example_batch_insert_test.go index a3b8274d..392d7786 100644 --- a/example_batch_insert_test.go +++ b/example_batch_insert_test.go @@ -67,7 +67,7 @@ func Example_batchInsert() { panic(err) } - count, err := riverClient.InsertManyFast(ctx, []river.InsertManyParams{ + results, err := riverClient.InsertMany(ctx, []river.InsertManyParams{ {Args: BatchInsertArgs{}}, {Args: BatchInsertArgs{}}, {Args: BatchInsertArgs{}}, @@ -77,7 +77,7 @@ func Example_batchInsert() { if err != nil { panic(err) } - fmt.Printf("Inserted %d jobs\n", count) + fmt.Printf("Inserted %d jobs\n", len(results)) waitForNJobs(subscribeChan, 5) diff --git a/rivertest/rivertest_test.go b/rivertest/rivertest_test.go index 7201ed0a..c51a59db 100644 --- a/rivertest/rivertest_test.go +++ b/rivertest/rivertest_test.go @@ -168,7 +168,7 @@ func TestRequireInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, }) @@ -440,7 +440,7 @@ func TestRequireNotInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, }) @@ -738,7 +738,7 @@ func TestRequireManyInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, }) @@ -758,7 +758,7 @@ func TestRequireManyInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, {Args: Job2Args{Int: 123}}, @@ -847,7 +847,7 @@ func TestRequireManyInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, }) @@ -867,7 +867,7 @@ func TestRequireManyInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job2Args{Int: 123}}, {Args: Job1Args{String: "foo"}}, }) @@ -888,7 +888,7 @@ func TestRequireManyInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, {Args: Job2Args{Int: 123}},