Skip to content

Commit

Permalink
More robust listener close in riverpgxv5
Browse files Browse the repository at this point in the history
As part of #239, we're observing the possibility of the notifier going
into a hot loop where it's trying to reopen a listener connection, but
the listener won't let it because it thinks the connection is already
open.

A suspect is the listener's `Close` implementation, which in the event
of an error, returns the error and fails to release an underlying
connection, putting it into a state where it's never reusable.

Here, modify `Close` so that it always releases and unsets an underlying
connection regardless of the error state returned.

I tried to add a test case for this, but reading through pgx and net
source code, I couldn't find any way to simulate an error from `Close`
(I thought a cancelled context would do it, but it does not), so I had
to leave it.
  • Loading branch information
brandur committed Mar 1, 2024
1 parent 035ba59 commit 02267da
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 7 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Fixed a problem in `riverpgxv5`'s `Listener` where it wouldn't unset an internal connection if `Close` returned an error, making the listener not reusable. Thanks @mfrister for pointing this one out! [PR #246](https://github.com/riverqueue/river/pull/246).

## [0.0.24] - 2024-02-29

### Fixed
Expand Down
38 changes: 35 additions & 3 deletions internal/riverinternaltest/riverdrivertest/riverdrivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1617,9 +1617,6 @@ func setupListener[TTx any](ctx context.Context, t *testing.T, getDriverWithPool
driver := getDriverWithPool(ctx, t)

listener := driver.GetListener()
t.Cleanup(func() { require.NoError(t, listener.Close(ctx)) })

require.NoError(t, listener.Connect(ctx))

return listener, &testListenerBundle[TTx]{
driver: driver,
Expand All @@ -1630,6 +1627,13 @@ func setupListener[TTx any](ctx context.Context, t *testing.T, getDriverWithPool
func ExerciseListener[TTx any](ctx context.Context, t *testing.T, getDriverWithPool func(ctx context.Context, t *testing.T) riverdriver.Driver[TTx]) {
t.Helper()

connectListener := func(ctx context.Context, t *testing.T, listener riverdriver.Listener) {
t.Helper()

require.NoError(t, listener.Connect(ctx))
t.Cleanup(func() { require.NoError(t, listener.Close(ctx)) })
}

requireNoNotification := func(ctx context.Context, t *testing.T, listener riverdriver.Listener) {
t.Helper()

Expand All @@ -1653,11 +1657,20 @@ func ExerciseListener[TTx any](ctx context.Context, t *testing.T, getDriverWithP
return notification
}

t.Run("Close_NoOpIfNotConnected", func(t *testing.T) {
t.Parallel()

listener, _ := setupListener(ctx, t, getDriverWithPool)
require.NoError(t, listener.Close(ctx))
})

t.Run("RoundTrip", func(t *testing.T) {
t.Parallel()

listener, bundle := setupListener(ctx, t, getDriverWithPool)

connectListener(ctx, t, listener)

require.NoError(t, listener.Listen(ctx, "topic1"))
require.NoError(t, listener.Listen(ctx, "topic2"))

Expand Down Expand Up @@ -1695,6 +1708,8 @@ func ExerciseListener[TTx any](ctx context.Context, t *testing.T, getDriverWithP

listener, bundle := setupListener(ctx, t, getDriverWithPool)

connectListener(ctx, t, listener)

require.NoError(t, listener.Listen(ctx, "topic1"))

execTx, err := bundle.exec.Begin(ctx)
Expand All @@ -1711,6 +1726,23 @@ func ExerciseListener[TTx any](ctx context.Context, t *testing.T, getDriverWithP
notification := waitForNotification(ctx, t, listener)
require.Equal(t, &riverdriver.Notification{Topic: "topic1", Payload: "payload1"}, notification)
})

t.Run("MultipleReuse", func(t *testing.T) {
t.Parallel()

listener, _ := setupListener(ctx, t, getDriverWithPool)

connectListener(ctx, t, listener)

require.NoError(t, listener.Listen(ctx, "topic1"))
require.NoError(t, listener.Unlisten(ctx, "topic1"))

require.NoError(t, listener.Close(ctx))
require.NoError(t, listener.Connect(ctx))

require.NoError(t, listener.Listen(ctx, "topic1"))
require.NoError(t, listener.Unlisten(ctx, "topic1"))
})
}

// requireEqualTime compares to timestamps down the microsecond only. This is
Expand Down
10 changes: 6 additions & 4 deletions riverdriver/riverpgxv5/river_pgx_v5_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,12 +475,14 @@ func (l *Listener) Close(ctx context.Context) error {
return nil
}

if err := l.conn.Conn().Close(ctx); err != nil {
return err
}
err := l.conn.Conn().Close(ctx)

// Regardless of the error state returned above, always release and unset
// the listener's local connection.
l.conn.Release()
l.conn = nil
return nil

return err
}

func (l *Listener) Connect(ctx context.Context) error {
Expand Down

0 comments on commit 02267da

Please sign in to comment.