Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - sql: add Database.WithConnection #6445

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 118 additions & 18 deletions sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,24 +572,83 @@
type Database interface {
Executor
QueryCache
// Close closes the database.
Close() error
// QueryCount returns the number of queries executed on the database.
QueryCount() int
// QueryCache returns the query cache for this database, if it's present,
// or nil otherwise.
QueryCache() QueryCache
// Tx creates deferred sqlite transaction.
//
// Deferred transactions are not started until the first statement.
// Transaction may be started in read mode and automatically upgraded to write mode
// after one of the write statements.
//
// https://www.sqlite.org/lang_transaction.html
Tx(ctx context.Context) (Transaction, error)
// WithTx starts a new transaction and passes it to the exec function.
// It then commits the transaction if the exec function doesn't return an error,
// and rolls it back otherwise.
// If the context is canceled, the currently running SQL statement is interrupted.
WithTx(ctx context.Context, exec func(Transaction) error) error
// TxImmediate begins a new immediate transaction on the database, that is,
// a transaction that starts a write immediately without waiting for a write
// statement.
// The transaction returned from this function must always be released by calling
// its Release method. Release rolls back the transaction if it hasn't been
// committed.
// If the context is canceled, the currently running SQL statement is interrupted.
TxImmediate(ctx context.Context) (Transaction, error)
// WithTxImmediate starts a new immediate transaction and passes it to the exec
// function.
// An immediate transaction is started immediately, without waiting for a write
// statement.
// It then commits the transaction if the exec function doesn't return an error,
// and rolls it back otherwise.
// If the context is canceled, the currently running SQL statement is interrupted.
WithTxImmediate(ctx context.Context, exec func(Transaction) error) error
// Connection returns a connection from the database pool.
// If many queries are to be executed in a row, but there's no need for an
// explicit transaction which may be long-running and thus block
// WAL checkpointing, it may be preferable to use a single connection for
// it to avoid database pool overhead.
// The connection needs to be always returned to the pool by calling its Release
// method.
// If the context is canceled, the currently running SQL statement is interrupted.
Connection(ctx context.Context) (Connection, error)
// WithConnection executes the provided function with a connection from the
// database pool.
// The connection is released back to the pool after the function returns.
// If the context is canceled, the currently running SQL statement is interrupted.
WithConnection(ctx context.Context, exec func(Connection) error) error
// Intercept adds an interceptor function to the database. The interceptor
// functions are invoked upon each query on the database, including queries
// executed within transactions.
// The query will fail if the interceptor returns an error.
// The interceptor can later be removed using RemoveInterceptor with the same key.
Intercept(key string, fn Interceptor)
// RemoveInterceptor removes the interceptor function with specified key from the database.
RemoveInterceptor(key string)
}

// Transaction represents a transaction.
type Transaction interface {
Executor
// Commit commits the transaction.
Commit() error
// Release releases the transaction. If the transaction hasn't been committed,
// it's rolled back.
Release() error
}

// Connection represents a database connection.
type Connection interface {
Executor
// Release releases the connection back to the connection pool.
Release()
}

type sqliteDatabase struct {
*queryCache
pool *sqlitex.Pool
Expand Down Expand Up @@ -684,34 +743,22 @@
return nil
}

// Tx creates deferred sqlite transaction.
//
// Deferred transactions are not started until the first statement.
// Transaction may be started in read mode and automatically upgraded to write mode
// after one of the write statements.
//
// https://www.sqlite.org/lang_transaction.html
// Tx implements Database.
func (db *sqliteDatabase) Tx(ctx context.Context) (Transaction, error) {
return db.getTx(ctx, beginDefault)
}

// WithTx will pass initialized deferred transaction to exec callback.
// Will commit only if error is nil.
// WithTx implements Database.
func (db *sqliteDatabase) WithTx(ctx context.Context, exec func(Transaction) error) error {
return db.withTx(ctx, beginDefault, exec)
}

// TxImmediate creates immediate transaction.
//
// IMMEDIATE cause the database connection to start a new write immediately, without waiting
// for a write statement. The BEGIN IMMEDIATE might fail with SQLITE_BUSY if another write
// transaction is already active on another database connection.
// TxImmediate implements Database.
func (db *sqliteDatabase) TxImmediate(ctx context.Context) (Transaction, error) {
return db.getTx(ctx, beginImmediate)
}

// WithTxImmediate will pass initialized immediate transaction to exec callback.
// Will commit only if error is nil.
// WithTxImmediate implements Database.
func (db *sqliteDatabase) WithTxImmediate(ctx context.Context, exec func(Transaction) error) error {
return db.withTx(ctx, beginImmediate, exec)
}
Expand All @@ -727,7 +774,7 @@
return nil
}

// Exec statement using one of the connection from the pool.
// Exec implements Executor.
//
// If you care about atomicity of the operation (for example writing rewards to multiple accounts)
// Tx should be used. Otherwise sqlite will not guarantee that all side-effects of operations are
Expand Down Expand Up @@ -758,7 +805,7 @@
return exec(conn, query, encoder, decoder)
}

// Close closes all pooled connections.
// Close implements Database.
func (db *sqliteDatabase) Close() error {
db.closeMux.Lock()
defer db.closeMux.Unlock()
Expand All @@ -772,6 +819,30 @@
return nil
}

// Connection implements Database.
func (db *sqliteDatabase) Connection(ctx context.Context) (Connection, error) {
if db.closed {
return nil, ErrClosed
}

Check warning on line 826 in sql/database.go

View check run for this annotation

Codecov / codecov/patch

sql/database.go#L825-L826

Added lines #L825 - L826 were not covered by tests
conCtx, cancel := context.WithCancel(ctx)
conn := db.getConn(conCtx)
if conn == nil {
cancel()
return nil, ErrNoConnection
}

Check warning on line 832 in sql/database.go

View check run for this annotation

Codecov / codecov/patch

sql/database.go#L830-L832

Added lines #L830 - L832 were not covered by tests
return &sqliteConn{queryCache: db.queryCache, db: db, conn: conn, freeConn: cancel}, nil
}
fasmat marked this conversation as resolved.
Show resolved Hide resolved

// WithConnection implements Database.
func (db *sqliteDatabase) WithConnection(ctx context.Context, exec func(Connection) error) error {
conn, err := db.Connection(ctx)
if err != nil {
return err
}

Check warning on line 841 in sql/database.go

View check run for this annotation

Codecov / codecov/patch

sql/database.go#L840-L841

Added lines #L840 - L841 were not covered by tests
defer conn.Release()
return exec(conn)
}

// Intercept adds an interceptor function to the database. The interceptor functions
// are invoked upon each query. The query will fail if the interceptor returns an error.
// The interceptor can later be removed using RemoveInterceptor with the same key.
Expand Down Expand Up @@ -1093,6 +1164,35 @@
return exec(tx.conn, query, encoder, decoder)
}

type sqliteConn struct {
*queryCache
db *sqliteDatabase
conn *sqlite.Conn
freeConn func()
}

var _ Connection = &sqliteConn{}

func (c *sqliteConn) Release() {
c.freeConn()
c.db.pool.Put(c.conn)
}

func (c *sqliteConn) Exec(query string, encoder Encoder, decoder Decoder) (int, error) {
if err := c.db.runInterceptors(query); err != nil {
return 0, fmt.Errorf("running query interceptors: %w", err)
}

Check warning on line 1184 in sql/database.go

View check run for this annotation

Codecov / codecov/patch

sql/database.go#L1183-L1184

Added lines #L1183 - L1184 were not covered by tests

c.db.queryCount.Add(1)
if c.db.latency != nil {
start := time.Now()
defer func() {
c.db.latency.WithLabelValues(query).Observe(float64(time.Since(start)))
}()

Check warning on line 1191 in sql/database.go

View check run for this annotation

Codecov / codecov/patch

sql/database.go#L1188-L1191

Added lines #L1188 - L1191 were not covered by tests
}
return exec(c.conn, query, encoder, decoder)
}

func mapSqliteError(err error) error {
switch sqlite.ErrCode(err) {
case sqlite.SQLITE_CONSTRAINT_PRIMARYKEY, sqlite.SQLITE_CONSTRAINT_UNIQUE:
Expand Down
37 changes: 34 additions & 3 deletions sql/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ func Test_Migration_Rollback(t *testing.T) {
migration1.EXPECT().Apply(gomock.Any(), gomock.Any()).Return(nil)
migration2.EXPECT().Apply(gomock.Any(), gomock.Any()).Return(errors.New("migration 2 failed"))

migration2.EXPECT().Rollback().Return(nil)

dbFile := filepath.Join(t.TempDir(), "test.sql")
_, err := Open("file:"+dbFile,
WithDatabaseSchema(&Schema{
Expand Down Expand Up @@ -129,7 +127,6 @@ func Test_Migration_Rollback_Only_NewMigrations(t *testing.T) {
migration2.EXPECT().Name().Return("test").AnyTimes()
migration2.EXPECT().Order().Return(2).AnyTimes()
migration2.EXPECT().Apply(gomock.Any(), gomock.Any()).Return(errors.New("migration 2 failed"))
migration2.EXPECT().Rollback().Return(nil)

_, err = Open("file:"+dbFile,
WithLogger(logger),
Expand Down Expand Up @@ -638,3 +635,37 @@ func TestExclusive(t *testing.T) {
})
}
}

func TestConnection(t *testing.T) {
db := InMemoryTest(t)
c, err := db.Connection(context.Background())
require.NoError(t, err)
var r int
n, err := c.Exec("select ?", func(stmt *Statement) {
stmt.BindInt64(1, 42)
}, func(stmt *Statement) bool {
r = stmt.ColumnInt(0)
return true
})
require.NoError(t, err)
require.Equal(t, 1, n)
require.Equal(t, 42, r)
c.Release()
fasmat marked this conversation as resolved.
Show resolved Hide resolved

require.NoError(t, db.WithConnection(context.Background(), func(c Connection) error {
n, err := c.Exec("select ?", func(stmt *Statement) {
stmt.BindInt64(1, 42)
}, func(stmt *Statement) bool {
r = stmt.ColumnInt(0)
return true
})
require.NoError(t, err)
require.Equal(t, 1, n)
require.Equal(t, 42, r)
return nil
}))

require.Error(t, db.WithConnection(context.Background(), func(c Connection) error {
return errors.New("error")
}))
}
5 changes: 4 additions & 1 deletion sql/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ import "go.uber.org/zap"

// Executor is an interface for executing raw statement.
type Executor interface {
// Exec executes a statement.
Exec(string, Encoder, Decoder) (int, error)
}

// Migration is interface for migrations provider.
type Migration interface {
// Apply applies the migration.
Apply(db Executor, logger *zap.Logger) error
Rollback() error
fasmat marked this conversation as resolved.
Show resolved Hide resolved
// Name returns the name of the migration.
Name() string
// Order returns the sequential number of the migration.
Order() int
}
5 changes: 0 additions & 5 deletions sql/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@ func (m *sqlMigration) Order() int {
return m.order
}

func (sqlMigration) Rollback() error {
// handled by the DB itself
return nil
}
fasmat marked this conversation as resolved.
Show resolved Hide resolved

func version(db Executor) (int, error) {
var current int
if _, err := db.Exec("PRAGMA user_version;", nil, func(stmt *Statement) bool {
Expand Down
38 changes: 0 additions & 38 deletions sql/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 7 additions & 12 deletions sql/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
Expand All @@ -31,14 +30,17 @@ func LoadDBSchemaScript(db Executor) (string, error) {
return "", err
}
fmt.Fprintf(&sb, "PRAGMA user_version = %d;\n", version)
// The following SQL query ensures that tables are listed first,
// ordered by name, and then all other objects, ordered by their table name
// and then by their own name.
if _, err = db.Exec(`
SELECT tbl_name, sql || ';'
FROM sqlite_master
WHERE sql IS NOT NULL AND tbl_name NOT LIKE 'sqlite_%'
ORDER BY
CASE WHEN type = 'table' THEN 1 ELSE 2 END, -- ensures tables are first
tbl_name, -- tables are sorted by name, then all other objects
name -- (indexes, triggers, etc.) also by name
CASE WHEN type = 'table' THEN 1 ELSE 2 END,
tbl_name,
name
`, nil, func(st *Statement) bool {
fmt.Fprintln(&sb, st.ColumnText(1))
return true
Expand Down Expand Up @@ -143,20 +145,13 @@ func (s *Schema) Migrate(logger *zap.Logger, db Database, before, vacuumState in
db.Intercept("logQueries", logQueryInterceptor(logger))
defer db.RemoveInterceptor("logQueries")
}
for i, m := range s.Migrations {
for _, m := range s.Migrations {
if m.Order() <= before {
continue
}
if err := db.WithTxImmediate(context.Background(), func(tx Transaction) error {
if _, ok := s.skipMigration[m.Order()]; !ok {
if err := m.Apply(tx, logger); err != nil {
for j := i; j >= 0 && s.Migrations[j].Order() > before; j-- {
if e := s.Migrations[j].Rollback(); e != nil {
err = errors.Join(err, fmt.Errorf("rollback %s: %w", m.Name(), e))
break
}
}
fasmat marked this conversation as resolved.
Show resolved Hide resolved

return fmt.Errorf("apply %s: %w", m.Name(), err)
}
}
Expand Down
4 changes: 0 additions & 4 deletions sql/statesql/migrations/state_0021_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ func (*migration0021) Order() int {
return 21
}

func (*migration0021) Rollback() error {
return nil
}

func (m *migration0021) Apply(db sql.Executor, logger *zap.Logger) error {
if err := m.applySql(db); err != nil {
return err
Expand Down
Loading
Loading