Skip to content

Commit

Permalink
Merge pull request juju#17926 from SimonRichardson/state-base-context…
Browse files Browse the repository at this point in the history
…-run

juju#17926

RunAtomic starts to add the ability to call discrete state methods within a transaction scope, without exposing the underlying transaction implementation at the service layer.

It solves the problem of trying to execute business logic at the service layer without exposing sql.Tx or sqlair.TX types. The AtomicContext interface type is a typed context.Context that can be provided as a parameter to the Run closure.

The RunAtomic method gets a DB and then creates a transaction. Within that transaction, a txContext is then created that captures both the context and the sqlair.TX. This tuple can then be passed as an argument to the closure. The txContext ensures that the sqlair.TX can only be within the closure for the lifetime of the transaction. This is to prevent capturing the AtomicContext and using it outside of the RunAtomic method. This type of usage will cause an error later on in the state code if someone does attempt to use it. The tuple was defined this way, as it's not possible to modify a generic context.Value without creating a new context, or creating a bespoke implementation. Thus the tuple presents the best approach to ensuring safety and easy access.

It should be noted that because of go invariance it's not possible to swap a RunAtomic closure for the following type:

 st.RunAtomic(ctx, func(ctx context.Context) error { ... })

This will simply not compile. The RunAtomic function must be of instance type:

 st.RunAtomic(ctx, func(txCtx AtomicContext) error { ... })

Lastly, I've added a note that the AtomicContext implementation (txContext) can be pooled[1] to keep the allocations of the txContext low. That is not part of this changeset and can be done in future work.

 1. https://pkg.go.dev/sync#Pool

<!-- 
The PR title should match: <type>(optional <scope>): <description>.

Please also ensure all commits in this PR comply with our conventional commits specification:
https://docs.google.com/document/d/1SYUo9G7qZ_jdoVXpUVamS5VCgHmtZ0QA-wZxKoMS-C0 
-->

<!-- Why this change is needed and what it does. -->

## Checklist

<!-- If an item is not applicable, use `~strikethrough~`. -->

- [x] Code style: imports ordered, good names, simple structure, etc
- [x] Comments saying why design decisions were made
- [x] Go unit tests, with comments saying what you're testing

## QA steps

Nothing is using this, more patches will be added to build upon this infrastructure. 

## Links


**Jira card:** JUJU-
  • Loading branch information
jujubot authored Aug 16, 2024
2 parents 6ba4623 + ea5fca4 commit 1987248
Show file tree
Hide file tree
Showing 2 changed files with 382 additions and 1 deletion.
113 changes: 112 additions & 1 deletion domain/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package domain
import (
"context"
"database/sql"
"fmt"
"sync"

"github.com/canonical/sqlair"
Expand Down Expand Up @@ -99,9 +100,93 @@ func (st *StateBase) Prepare(query string, typeSamples ...any) (*sqlair.Statemen
return stmt, nil
}

// RunAtomic executes the closure function within the scope of a transaction.
// The closure is passed a AtomicContext that can be passed on to state
// functions, so that they can perform work within that same transaction. The
// closure will be retried according to the transaction retry semantics, if the
// transaction fails due to transient errors. The closure should only be used to
// perform state changes and must not be used to execute queries outside of the
// state scope. This includes performing goroutines or other async operations.
func (st *StateBase) RunAtomic(ctx context.Context, fn func(AtomicContext) error) error {
db, err := st.DB()
if err != nil {
return errors.Annotate(err, "getting database")
}

return db.Txn(ctx, func(ctx context.Context, tx *sqlair.TX) error {
// The atomicContext is created with the transaction and passed to the
// closure function. This ensures that the transaction is always
// available to the closure. Once the transaction is complete, the
// transaction is removed from the atomicContext. This is to prevent the
// transaction from being used outside of the transaction scope. This
// will prevent any references to the sqlair.TX from being held outside
// of the transaction scope.

// TODO (stickupkid): The atomicContext can be pooled on the StateBase
// to reduce the number of allocations. Attempting to push the tx into
// the context would prevent that as a viable option.
txCtx := &atomicContext{
Context: ctx,
tx: tx,
}
defer txCtx.close()

return fn(txCtx)
})
}

// AtomicStateBase is an interface that provides a method for executing a
// closure within the scope of a transaction.
type AtomicStateBase interface {
// RunAtomic executes the closure function within the scope of a
// transaction. The closure is passed a AtomicContext that can be passed on
// to state functions, so that they can perform work within that same
// transaction. The closure will be retried according to the transaction
// retry semantics, if the transaction fails due to transient errors. The
// closure should only be used to perform state changes and must not be used
// to execute queries outside of the state scope. This includes performing
// goroutines or other async operations.
RunAtomic(ctx context.Context, fn func(AtomicContext) error) error
}

// Run executes the closure function using the provided AtomicContext as the
// transaction context. It is expected that the closure will perform state
// changes within the transaction scope. Any errors returned from the closure
// are coerced into a standard error to prevent sqlair errors from being
// returned to the Service layer.
func Run(ctx AtomicContext, fn func(context.Context, *sqlair.TX) error) error {
txCtx, ok := ctx.(*atomicContext)
if !ok {
// If you're seeing this error, it means that the atomicContext was not
// created by RunAtomic. This is a programming error. Did you attempt to
// wrap the context in a custom context and pass it to Run?
return fmt.Errorf("programmatic error: AtomicContext is not a *atomicContext: %T", ctx)
}

// Ensure that we can lock the context for the duration of the run function.
// This is to prevent the transaction from being removed from the context
// or the service layer from attempting to use the transaction outside of
// the transaction scope.
txCtx.mu.Lock()
defer txCtx.mu.Unlock()

tx := txCtx.tx
if tx == nil {
// If you're seeing this error, it means that the AtomicContext was not
// created by RunAtomic. This is a programming error. Did you capture
// the AtomicContext from a RunAtomic closure and try to use it outside
// of the closure?
return fmt.Errorf("programmatic error: AtomicContext does not have a transaction")
}

// Execute the function with the transaction.
// Coerce the error to ensure that no sql or sqlair errors are returned
// from the function and into the Service layer.
return CoerceError(fn(ctx, tx))
}

// txnRunner is a wrapper around a database.TxnRunner that implements the
// database.TxnRunner interface.
// It is used to coerce the error returned by the database.TxnRunner into a
type txnRunner struct {
runner database.TxnRunner
}
Expand All @@ -119,3 +204,29 @@ func (r *txnRunner) Txn(ctx context.Context, fn func(context.Context, *sqlair.TX
func (r *txnRunner) StdTxn(ctx context.Context, fn func(context.Context, *sql.Tx) error) error {
return CoerceError(r.runner.StdTxn(ctx, fn))
}

// AtomicContext is a typed context that provides access to the database transaction
// for the duration of a transaction.
type AtomicContext interface {
context.Context
}

// atomicContext is the concrete implementation of the AtomicContext interface.
// The atomicContext ensures that a transaction is always available to during
// the execution of a transaction. The atomicContext stores the sqlair.TX
// directly on the struct to prevent the need to fork the context during the
// transaction. The mutex prevents data-races when the transaction is removed
// from the context.
type atomicContext struct {
context.Context

mu sync.Mutex
tx *sqlair.TX
}

func (c *atomicContext) close() {
c.mu.Lock()
defer c.mu.Unlock()

c.tx = nil
}
Loading

0 comments on commit 1987248

Please sign in to comment.