Skip to content

Commit

Permalink
Fix bug in SharedTx when Query returns on error condition (#311)
Browse files Browse the repository at this point in the history
Fix a hard-to-spot but long running bug in `SharedTx` in which when
`Query` was invoked, the standard success case unlocked the `SharedTx`
as expected, but if returning under error condition, the parent
`SharedTx` would not be unlocked.

This wasn't firing most of the time, but could occur in some stress
tests where maintenance services would occasionally successfully make it
into their work loop before their context was cancelled. The context
cancellation would error `Query`, not unlock `SharedTx`, and cause an
error if another service then later on entered its work loop and tried
to use `SharedTx`.
  • Loading branch information
brandur authored Apr 21, 2024
1 parent e791aef commit 1200f71
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
3 changes: 2 additions & 1 deletion internal/riverinternaltest/sharedtx/shared_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ func (e *SharedTx) Exec(ctx context.Context, query string, args ...any) (pgconn.

func (e *SharedTx) Query(ctx context.Context, query string, args ...any) (pgx.Rows, error) {
e.lock()
// no unlock until rows close
// no unlock until rows close or return on error condition

rows, err := e.inner.Query(ctx, query, args...)
if err != nil {
e.unlock()
return nil, err
}

Expand Down
23 changes: 23 additions & 0 deletions internal/riverinternaltest/sharedtx/shared_tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,27 @@ func TestSharedTx(t *testing.T) {

require.Len(t, sharedTx.wait, 1)
})

// Checks specifically that the shared transaction is unlocked correctly on
// the Query function's error path (normally it's unlocked when the returned
// rows struct is closed, so an additional unlock operation is required).
t.Run("QueryUnlocksOnError", func(t *testing.T) {
t.Parallel()

sharedTx := setup(t)

{
// Roll back the transaction so using it returns an error.
require.NoError(t, sharedTx.inner.Rollback(ctx))

_, err := sharedTx.Query(ctx, "SELECT 1") //nolint:sqlclosecheck
require.ErrorIs(t, err, pgx.ErrTxClosed)

select {
case <-sharedTx.wait:
default:
require.FailNow(t, "Should have been a value in shared transaction's wait channel")
}
}
})
}

0 comments on commit 1200f71

Please sign in to comment.