diff --git a/CHANGELOG.md b/CHANGELOG.md index c8613022..713fc2b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- `riverpgxv5` driver: `Hijack()` the underlying listener connection as soon as it is acquired from the `pgxpool.Pool` in order to prevent the pool from automatically closing it after it reaches its max age. A max lifetime makes sense in the context of a pool with many conns, but a long-lived listener does not need a max lifetime as long as it can ensure the conn remains healthy. [PR #661](https://github.com/riverqueue/river/pull/661). + ## [0.13.0] - 2024-10-07 ⚠️ Version 0.13.0 removes the original advisory lock based unique jobs implementation that was deprecated in v0.12.0. See details in the note below or the v0.12.0 release notes. diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index dcad104c..29292b3a 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -735,7 +735,7 @@ func (t *ExecutorTx) Rollback(ctx context.Context) error { } type Listener struct { - conn *pgxpool.Conn + conn *pgx.Conn dbPool *pgxpool.Pool prefix string mu sync.Mutex @@ -753,11 +753,10 @@ func (l *Listener) Close(ctx context.Context) error { // connection back into rotation, but in case a Listen was invoked without a // subsequent Unlisten on the same topic, close the connection explicitly to // guarantee no other caller will receive a partially tainted connection. - err := l.conn.Conn().Close(ctx) + err := l.conn.Close(ctx) // Even in the event of an error, make sure conn is set back to nil so that // the listener can be reused. - l.conn.Release() l.conn = nil return err @@ -771,19 +770,22 @@ func (l *Listener) Connect(ctx context.Context) error { return errors.New("connection already established") } - conn, err := l.dbPool.Acquire(ctx) + poolConn, err := l.dbPool.Acquire(ctx) if err != nil { return err } var schema string - if err := conn.QueryRow(ctx, "SELECT current_schema();").Scan(&schema); err != nil { - conn.Release() + if err := poolConn.QueryRow(ctx, "SELECT current_schema();").Scan(&schema); err != nil { + poolConn.Release() return err } l.prefix = schema + "." - l.conn = conn + // Assume full ownership of the conn so that it doesn't get released back to + // the pool or auto-closed by the pool. + l.conn = poolConn.Hijack() + return nil } @@ -814,7 +816,7 @@ func (l *Listener) WaitForNotification(ctx context.Context) (*riverdriver.Notifi l.mu.Lock() defer l.mu.Unlock() - notification, err := l.conn.Conn().WaitForNotification(ctx) + notification, err := l.conn.WaitForNotification(ctx) if err != nil { return nil, err }