Skip to content

Commit

Permalink
add ClientFromContext examples for both pgx + dbsql
Browse files Browse the repository at this point in the history
  • Loading branch information
bgentry committed Oct 24, 2024
1 parent 56ddf09 commit 486aad7
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 0 deletions.
87 changes: 87 additions & 0 deletions example_client_from_context_dbsql_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package river_test

import (
"context"
"database/sql"
"errors"
"fmt"
"log/slog"
"time"

_ "github.com/jackc/pgx/v5/stdlib"

"github.com/riverqueue/river"
"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/riverdriver/riverdatabasesql"
"github.com/riverqueue/river/rivershared/util/slogutil"
)

type ContextClientSQLArgs struct{}

func (args ContextClientSQLArgs) Kind() string { return "ContextClientSQLWorker" }

type ContextClientSQLWorker struct {
river.WorkerDefaults[ContextClientSQLArgs]
}

func (w *ContextClientSQLWorker) Work(ctx context.Context, job *river.Job[ContextClientSQLArgs]) error {
client := river.ClientFromContext[*sql.Tx](ctx)
if client == nil {
fmt.Println("client not found in context")
return errors.New("client not found in context")
}

fmt.Printf("client found in context, id=%s\n", client.ID())
return nil
}

// Example_clientFromContext demonstrates how to extract the River client from the
// worker context.
func Example_clientFromContextDBSQL() {
ctx := context.Background()

config := riverinternaltest.DatabaseConfig("river_test_example")
db, err := sql.Open("pgx", config.ConnString())
if err != nil {
panic(err)
}
defer db.Close()

workers := river.NewWorkers()
river.AddWorker(workers, &ContextClientSQLWorker{})

riverClient, err := river.NewClient(riverdatabasesql.New(db), &river.Config{
ID: "ClientFromContextClientSQL",
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 10},
},
FetchCooldown: 10 * time.Millisecond,
FetchPollInterval: 10 * time.Millisecond,
TestOnly: true, // suitable only for use in tests; remove for live environments
Workers: workers,
})
if err != nil {
panic(err)
}

// Not strictly needed, but used to help this test wait until job is worked.
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
defer subscribeCancel()

if err := riverClient.Start(ctx); err != nil {
panic(err)
}
if _, err := riverClient.Insert(ctx, ContextClientSQLArgs{}, nil); err != nil {
panic(err)
}

waitForNJobs(subscribeChan, 1)

if err := riverClient.Stop(ctx); err != nil {
panic(err)
}

// Output:
// client found in context, id=ClientFromContextClientSQL
}
88 changes: 88 additions & 0 deletions example_client_from_context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package river_test

import (
"context"
"errors"
"fmt"
"log/slog"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"

"github.com/riverqueue/river"
"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivershared/util/slogutil"
)

type ContextClientArgs struct{}

func (args ContextClientArgs) Kind() string { return "ContextClientWorker" }

type ContextClientWorker struct {
river.WorkerDefaults[ContextClientArgs]
}

func (w *ContextClientWorker) Work(ctx context.Context, job *river.Job[ContextClientArgs]) error {
client := river.ClientFromContext[pgx.Tx](ctx)
if client == nil {
fmt.Println("client not found in context")
return errors.New("client not found in context")
}

fmt.Printf("client found in context, id=%s\n", client.ID())
return nil
}

// Example_clientFromContext demonstrates how to extract the River client from the
// worker context.
func Example_clientFromContext() {
ctx := context.Background()

dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_test_example"))
if err != nil {
panic(err)
}
defer dbPool.Close()

// Required for the purpose of this test, but not necessary in real usage.
if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
panic(err)
}

workers := river.NewWorkers()
river.AddWorker(workers, &ContextClientWorker{})

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
ID: "ClientFromContextClient",
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 10},
},
TestOnly: true, // suitable only for use in tests; remove for live environments
Workers: workers,
})
if err != nil {
panic(err)
}

// Not strictly needed, but used to help this test wait until job is worked.
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
defer subscribeCancel()

if err := riverClient.Start(ctx); err != nil {
panic(err)
}
if _, err = riverClient.Insert(ctx, ContextClientArgs{}, nil); err != nil {
panic(err)
}

waitForNJobs(subscribeChan, 1)

if err := riverClient.Stop(ctx); err != nil {
panic(err)
}

// Output:
// client found in context, id=ClientFromContextClient
}

0 comments on commit 486aad7

Please sign in to comment.