diff --git a/CHANGELOG.md b/CHANGELOG.md index b03e5a6e..7dcdae60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fixed a problem in `riverpgxv5`'s `Listener` where it wouldn't unset an internal connection if `Close` returned an error, making the listener not reusable. Thanks @mfrister for pointing this one out! [PR #246](https://github.com/riverqueue/river/pull/246). + ## [0.0.24] - 2024-02-29 ### Fixed diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 2e3f3d6c..fb1919e6 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -1617,9 +1617,6 @@ func setupListener[TTx any](ctx context.Context, t *testing.T, getDriverWithPool driver := getDriverWithPool(ctx, t) listener := driver.GetListener() - t.Cleanup(func() { require.NoError(t, listener.Close(ctx)) }) - - require.NoError(t, listener.Connect(ctx)) return listener, &testListenerBundle[TTx]{ driver: driver, @@ -1630,6 +1627,11 @@ func setupListener[TTx any](ctx context.Context, t *testing.T, getDriverWithPool func ExerciseListener[TTx any](ctx context.Context, t *testing.T, getDriverWithPool func(ctx context.Context, t *testing.T) riverdriver.Driver[TTx]) { t.Helper() + connectListener := func(ctx context.Context, t *testing.T, listener riverdriver.Listener) { + require.NoError(t, listener.Connect(ctx)) + t.Cleanup(func() { require.NoError(t, listener.Close(ctx)) }) + } + requireNoNotification := func(ctx context.Context, t *testing.T, listener riverdriver.Listener) { t.Helper() @@ -1653,11 +1655,22 @@ func ExerciseListener[TTx any](ctx context.Context, t *testing.T, getDriverWithP return notification } + t.Run("Close", func(t *testing.T) { + t.Parallel() + + t.Run("NoOpIfNotConnected", func(t *testing.T) { + listener, _ := setupListener(ctx, t, getDriverWithPool) + require.NoError(t, listener.Close(ctx)) + }) + }) + t.Run("RoundTrip", func(t *testing.T) { t.Parallel() listener, bundle := setupListener(ctx, t, getDriverWithPool) + connectListener(ctx, t, listener) + require.NoError(t, listener.Listen(ctx, "topic1")) require.NoError(t, listener.Listen(ctx, "topic2")) @@ -1695,6 +1708,8 @@ func ExerciseListener[TTx any](ctx context.Context, t *testing.T, getDriverWithP listener, bundle := setupListener(ctx, t, getDriverWithPool) + connectListener(ctx, t, listener) + require.NoError(t, listener.Listen(ctx, "topic1")) execTx, err := bundle.exec.Begin(ctx) diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 1f6c922e..4a94ef49 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -475,12 +475,14 @@ func (l *Listener) Close(ctx context.Context) error { return nil } - if err := l.conn.Conn().Close(ctx); err != nil { - return err - } + err := l.conn.Conn().Close(ctx) + + // Regardless of the error state returned above, always release and unset + // the listener's local connection. l.conn.Release() l.conn = nil - return nil + + return err } func (l *Listener) Connect(ctx context.Context) error {