From 486aad72a70485698bf4dea04640cc32f2929528 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Thu, 24 Oct 2024 17:32:04 -0500 Subject: [PATCH] add ClientFromContext examples for both pgx + dbsql --- example_client_from_context_dbsql_test.go | 87 ++++++++++++++++++++++ example_client_from_context_test.go | 88 +++++++++++++++++++++++ 2 files changed, 175 insertions(+) create mode 100644 example_client_from_context_dbsql_test.go create mode 100644 example_client_from_context_test.go diff --git a/example_client_from_context_dbsql_test.go b/example_client_from_context_dbsql_test.go new file mode 100644 index 00000000..be6c4590 --- /dev/null +++ b/example_client_from_context_dbsql_test.go @@ -0,0 +1,87 @@ +package river_test + +import ( + "context" + "database/sql" + "errors" + "fmt" + "log/slog" + "time" + + _ "github.com/jackc/pgx/v5/stdlib" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/internal/riverinternaltest" + "github.com/riverqueue/river/riverdriver/riverdatabasesql" + "github.com/riverqueue/river/rivershared/util/slogutil" +) + +type ContextClientSQLArgs struct{} + +func (args ContextClientSQLArgs) Kind() string { return "ContextClientSQLWorker" } + +type ContextClientSQLWorker struct { + river.WorkerDefaults[ContextClientSQLArgs] +} + +func (w *ContextClientSQLWorker) Work(ctx context.Context, job *river.Job[ContextClientSQLArgs]) error { + client := river.ClientFromContext[*sql.Tx](ctx) + if client == nil { + fmt.Println("client not found in context") + return errors.New("client not found in context") + } + + fmt.Printf("client found in context, id=%s\n", client.ID()) + return nil +} + +// Example_clientFromContext demonstrates how to extract the River client from the +// worker context. +func Example_clientFromContextDBSQL() { + ctx := context.Background() + + config := riverinternaltest.DatabaseConfig("river_test_example") + db, err := sql.Open("pgx", config.ConnString()) + if err != nil { + panic(err) + } + defer db.Close() + + workers := river.NewWorkers() + river.AddWorker(workers, &ContextClientSQLWorker{}) + + riverClient, err := river.NewClient(riverdatabasesql.New(db), &river.Config{ + ID: "ClientFromContextClientSQL", + Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), + Queues: map[string]river.QueueConfig{ + river.QueueDefault: {MaxWorkers: 10}, + }, + FetchCooldown: 10 * time.Millisecond, + FetchPollInterval: 10 * time.Millisecond, + TestOnly: true, // suitable only for use in tests; remove for live environments + Workers: workers, + }) + if err != nil { + panic(err) + } + + // Not strictly needed, but used to help this test wait until job is worked. + subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted) + defer subscribeCancel() + + if err := riverClient.Start(ctx); err != nil { + panic(err) + } + if _, err := riverClient.Insert(ctx, ContextClientSQLArgs{}, nil); err != nil { + panic(err) + } + + waitForNJobs(subscribeChan, 1) + + if err := riverClient.Stop(ctx); err != nil { + panic(err) + } + + // Output: + // client found in context, id=ClientFromContextClientSQL +} diff --git a/example_client_from_context_test.go b/example_client_from_context_test.go new file mode 100644 index 00000000..cb1e1707 --- /dev/null +++ b/example_client_from_context_test.go @@ -0,0 +1,88 @@ +package river_test + +import ( + "context" + "errors" + "fmt" + "log/slog" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/internal/riverinternaltest" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/util/slogutil" +) + +type ContextClientArgs struct{} + +func (args ContextClientArgs) Kind() string { return "ContextClientWorker" } + +type ContextClientWorker struct { + river.WorkerDefaults[ContextClientArgs] +} + +func (w *ContextClientWorker) Work(ctx context.Context, job *river.Job[ContextClientArgs]) error { + client := river.ClientFromContext[pgx.Tx](ctx) + if client == nil { + fmt.Println("client not found in context") + return errors.New("client not found in context") + } + + fmt.Printf("client found in context, id=%s\n", client.ID()) + return nil +} + +// Example_clientFromContext demonstrates how to extract the River client from the +// worker context. +func Example_clientFromContext() { + ctx := context.Background() + + dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_test_example")) + if err != nil { + panic(err) + } + defer dbPool.Close() + + // Required for the purpose of this test, but not necessary in real usage. + if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil { + panic(err) + } + + workers := river.NewWorkers() + river.AddWorker(workers, &ContextClientWorker{}) + + riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + ID: "ClientFromContextClient", + Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), + Queues: map[string]river.QueueConfig{ + river.QueueDefault: {MaxWorkers: 10}, + }, + TestOnly: true, // suitable only for use in tests; remove for live environments + Workers: workers, + }) + if err != nil { + panic(err) + } + + // Not strictly needed, but used to help this test wait until job is worked. + subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted) + defer subscribeCancel() + + if err := riverClient.Start(ctx); err != nil { + panic(err) + } + if _, err = riverClient.Insert(ctx, ContextClientArgs{}, nil); err != nil { + panic(err) + } + + waitForNJobs(subscribeChan, 1) + + if err := riverClient.Stop(ctx); err != nil { + panic(err) + } + + // Output: + // client found in context, id=ClientFromContextClient +}