From 8b4201fcdd2b667808d38d8c4e22ab9b31ce697d Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Wed, 30 Oct 2024 09:15:20 -0500 Subject: [PATCH] riverpgxv5: hijack raw listener conn to assume control (#661) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous implementation of the driver would `Acquire()` a conn from the `pgxpool.Pool`, but keep the pool in control of the conn. This meant that the pool would still do its regular maintenance checks on the conn, such as closing it after it had reached its max lifetime. This isn't actually desirable in the case of a `LISTEN` listener—we want the listener to stay alive as long as possible and to avoid missing any notifications. We perform our own health checks on the conn in the form of periodic pings, which should be sufficient to make sure the conn is still connected and functioning properly. As such, adjust the driver so that it calls `Hijack()` on the conn immediately after acquiring it, assuming full control of it from the underlying pool. The listener is still responsible for closing the conn at shutdown. Fixes #660. --- CHANGELOG.md | 4 ++++ riverdriver/riverpgxv5/river_pgx_v5_driver.go | 18 ++++++++++-------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c8613022..713fc2b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- `riverpgxv5` driver: `Hijack()` the underlying listener connection as soon as it is acquired from the `pgxpool.Pool` in order to prevent the pool from automatically closing it after it reaches its max age. A max lifetime makes sense in the context of a pool with many conns, but a long-lived listener does not need a max lifetime as long as it can ensure the conn remains healthy. [PR #661](https://github.com/riverqueue/river/pull/661). + ## [0.13.0] - 2024-10-07 ⚠️ Version 0.13.0 removes the original advisory lock based unique jobs implementation that was deprecated in v0.12.0. See details in the note below or the v0.12.0 release notes. diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index dcad104c..29292b3a 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -735,7 +735,7 @@ func (t *ExecutorTx) Rollback(ctx context.Context) error { } type Listener struct { - conn *pgxpool.Conn + conn *pgx.Conn dbPool *pgxpool.Pool prefix string mu sync.Mutex @@ -753,11 +753,10 @@ func (l *Listener) Close(ctx context.Context) error { // connection back into rotation, but in case a Listen was invoked without a // subsequent Unlisten on the same topic, close the connection explicitly to // guarantee no other caller will receive a partially tainted connection. - err := l.conn.Conn().Close(ctx) + err := l.conn.Close(ctx) // Even in the event of an error, make sure conn is set back to nil so that // the listener can be reused. - l.conn.Release() l.conn = nil return err @@ -771,19 +770,22 @@ func (l *Listener) Connect(ctx context.Context) error { return errors.New("connection already established") } - conn, err := l.dbPool.Acquire(ctx) + poolConn, err := l.dbPool.Acquire(ctx) if err != nil { return err } var schema string - if err := conn.QueryRow(ctx, "SELECT current_schema();").Scan(&schema); err != nil { - conn.Release() + if err := poolConn.QueryRow(ctx, "SELECT current_schema();").Scan(&schema); err != nil { + poolConn.Release() return err } l.prefix = schema + "." - l.conn = conn + // Assume full ownership of the conn so that it doesn't get released back to + // the pool or auto-closed by the pool. + l.conn = poolConn.Hijack() + return nil } @@ -814,7 +816,7 @@ func (l *Listener) WaitForNotification(ctx context.Context) (*riverdriver.Notifi l.mu.Lock() defer l.mu.Unlock() - notification, err := l.conn.Conn().WaitForNotification(ctx) + notification, err := l.conn.WaitForNotification(ctx) if err != nil { return nil, err }