Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrations: use a txn + commit for each migration, deprecate MigrateTx #600

Merged
merged 3 commits into from
Sep 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
}
```

- **Deprecated**: The `MigrateTx` method of `rivermigrate` has been deprecated. It turns out there are certain combinations of schema changes which cannot be run within a single transaction, and the migrator now prefers to run each migration in its own transaction, one-at-a-time. `MigrateTx` will be removed in future version.

- The migrator now produces a better error in case of a non-existent migration line including suggestions for known migration lines that are similar in name to the invalid one. [PR #558](https://github.com/riverqueue/river/pull/558).

## Fixed
Expand Down
23 changes: 12 additions & 11 deletions rivermigrate/example_migrate_database_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,27 @@ import (
func Example_migrateDatabaseSQL() {
ctx := context.Background()

dbPool, err := sql.Open("pgx", riverinternaltest.DatabaseURL("river_test_example"))
// Use a dedicated Postgres schema for this example so we can migrate and drop it at will:
schemaName := "migration_example_dbsql"
url := riverinternaltest.DatabaseURL("river_test_example") + "&search_path=" + schemaName
dbPool, err := sql.Open("pgx", url)
if err != nil {
panic(err)
}
defer dbPool.Close()

tx, err := dbPool.BeginTx(ctx, nil)
driver := riverdatabasesql.New(dbPool)
migrator, err := rivermigrate.New(driver, nil)
if err != nil {
panic(err)
}
defer tx.Rollback()

migrator, err := rivermigrate.New(riverdatabasesql.New(dbPool), nil)
if err != nil {
// Create the schema used for this example. Drop it when we're done.
// This isn't necessary outside this test.
if _, err := dbPool.ExecContext(ctx, "CREATE SCHEMA IF NOT EXISTS "+schemaName); err != nil {
panic(err)
}

// Our test database starts with a full River schema. Drop it so that we can
// demonstrate working migrations. This isn't necessary outside this test.
dropRiverSchema(ctx, migrator, tx)
defer dropRiverSchema(ctx, driver, schemaName)

printVersions := func(res *rivermigrate.MigrateResult) {
for _, version := range res.Versions {
Expand All @@ -47,7 +48,7 @@ func Example_migrateDatabaseSQL() {

// Migrate to version 3. An actual call may want to omit all MigrateOpts,
// which will default to applying all available up migrations.
res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
TargetVersion: 3,
})
if err != nil {
Expand All @@ -57,7 +58,7 @@ func Example_migrateDatabaseSQL() {

// Migrate down by three steps. Down migrating defaults to running only one
// step unless overridden by an option like MaxSteps or TargetVersion.
res, err = migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
res, err = migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
MaxSteps: 3,
})
if err != nil {
Expand Down
32 changes: 17 additions & 15 deletions rivermigrate/example_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/jackc/pgx/v5/pgxpool"

"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivermigrate"
)
Expand All @@ -17,26 +18,29 @@ import (
func Example_migrate() {
ctx := context.Background()

dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_test_example"))
// Use a dedicated Postgres schema for this example so we can migrate and drop it at will:
schemaName := "migration_example"
poolConfig := riverinternaltest.DatabaseConfig("river_test_example")
poolConfig.ConnConfig.RuntimeParams["search_path"] = schemaName

dbPool, err := pgxpool.NewWithConfig(ctx, poolConfig)
if err != nil {
panic(err)
}
defer dbPool.Close()

tx, err := dbPool.Begin(ctx)
driver := riverpgxv5.New(dbPool)
migrator, err := rivermigrate.New(driver, nil)
if err != nil {
panic(err)
}
defer tx.Rollback(ctx)

migrator, err := rivermigrate.New(riverpgxv5.New(dbPool), nil)
if err != nil {
// Create the schema used for this example. Drop it when we're done.
// This isn't necessary outside this test.
if _, err := dbPool.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS "+schemaName); err != nil {
panic(err)
}

// Our test database starts with a full River schema. Drop it so that we can
// demonstrate working migrations. This isn't necessary outside this test.
dropRiverSchema(ctx, migrator, tx)
defer dropRiverSchema(ctx, driver, schemaName)

printVersions := func(res *rivermigrate.MigrateResult) {
for _, version := range res.Versions {
Expand All @@ -46,7 +50,7 @@ func Example_migrate() {

// Migrate to version 3. An actual call may want to omit all MigrateOpts,
// which will default to applying all available up migrations.
res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
TargetVersion: 3,
})
if err != nil {
Expand All @@ -56,7 +60,7 @@ func Example_migrate() {

// Migrate down by three steps. Down migrating defaults to running only one
// step unless overridden by an option like MaxSteps or TargetVersion.
res, err = migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
res, err = migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
MaxSteps: 3,
})
if err != nil {
Expand All @@ -73,10 +77,8 @@ func Example_migrate() {
// Migrated [DOWN] version 1
}

func dropRiverSchema[TTx any](ctx context.Context, migrator *rivermigrate.Migrator[TTx], tx TTx) {
_, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
TargetVersion: -1,
})
func dropRiverSchema[TTx any](ctx context.Context, driver riverdriver.Driver[TTx], schemaName string) {
_, err := driver.GetExecutor().Exec(ctx, "DROP SCHEMA IF EXISTS "+schemaName+" CASCADE;")
if err != nil {
panic(err)
}
Expand Down
1 change: 1 addition & 0 deletions rivermigrate/migration/commit_required/001_first.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TYPE foobar;
2 changes: 2 additions & 0 deletions rivermigrate/migration/commit_required/001_first.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- create a foobar enum with values foo, bar:
CREATE TYPE foobar AS ENUM ('foo', 'bar');
1 change: 1 addition & 0 deletions rivermigrate/migration/commit_required/002_second.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- not truly reversible, can't remove enum values.
1 change: 1 addition & 0 deletions rivermigrate/migration/commit_required/002_second.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TYPE foobar ADD VALUE 'baz' AFTER 'bar';
1 change: 1 addition & 0 deletions rivermigrate/migration/commit_required/003_third.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP FUNCTION foobar_in_bitmask;
15 changes: 15 additions & 0 deletions rivermigrate/migration/commit_required/003_third.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE OR REPLACE FUNCTION foobar_in_bitmask(bitmask BIT(8), val foobar)
RETURNS boolean
LANGUAGE SQL
IMMUTABLE
AS $$
SELECT CASE val
WHEN 'foo' THEN get_bit(bitmask, 7)
WHEN 'bar' THEN get_bit(bitmask, 6)
-- Because the enum value 'baz' was added in migration 2 and not part
-- of the original enum, we can't use it in an immutable SQL function
-- unless the new enum value migration has been committed.
WHEN 'baz' THEN get_bit(bitmask, 5)
ELSE 0
END = 1;
$$;
37 changes: 25 additions & 12 deletions rivermigrate/river_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,16 +297,15 @@ func (m *Migrator[TTx]) GetVersion(version int) (Migration, error) {
// // handle error
// }
func (m *Migrator[TTx]) Migrate(ctx context.Context, direction Direction, opts *MigrateOpts) (*MigrateResult, error) {
return dbutil.WithTxV(ctx, m.driver.GetExecutor(), func(ctx context.Context, exec riverdriver.ExecutorTx) (*MigrateResult, error) {
switch direction {
case DirectionDown:
return m.migrateDown(ctx, exec, direction, opts)
case DirectionUp:
return m.migrateUp(ctx, exec, direction, opts)
}
exec := m.driver.GetExecutor()
switch direction {
case DirectionDown:
return m.migrateDown(ctx, exec, direction, opts)
case DirectionUp:
return m.migrateUp(ctx, exec, direction, opts)
}

panic("invalid direction: " + direction)
})
panic("invalid direction: " + direction)
}

// Migrate migrates the database in the given direction (up or down). The opts
Expand All @@ -327,6 +326,9 @@ func (m *Migrator[TTx]) Migrate(ctx context.Context, direction Direction, opts *
// This variant lets a caller run migrations within a transaction. Postgres DDL
// is transactional, so migration changes aren't visible until the transaction
// commits, and are rolled back if the transaction rolls back.
//
// Deprecated: Use Migrate instead. Certain migrations cannot be batched together
// in a single transaction, so this method is not recommended.
func (m *Migrator[TTx]) MigrateTx(ctx context.Context, tx TTx, direction Direction, opts *MigrateOpts) (*MigrateResult, error) {
switch direction {
case DirectionDown:
Expand Down Expand Up @@ -560,10 +562,21 @@ func (m *Migrator[TTx]) applyMigrations(ctx context.Context, exec riverdriver.Ex

if !opts.DryRun {
start := time.Now()
_, err := exec.Exec(ctx, sql)

// Similar to ActiveRecord migrations, we wrap each individual migration
// in its own transaction. Without this, certain migrations that require
// a commit on a preexisting operation (such as adding an enum value to be
// used in an immutable function) cannot succeed.
err := dbutil.WithTx(ctx, exec, func(ctx context.Context, exec riverdriver.ExecutorTx) error {
_, err := exec.Exec(ctx, sql)
if err != nil {
return fmt.Errorf("error applying version %03d [%s]: %w",
versionBundle.Version, strings.ToUpper(string(direction)), err)
}
return nil
})
if err != nil {
return nil, fmt.Errorf("error applying version %03d [%s]: %w",
versionBundle.Version, strings.ToUpper(string(direction)), err)
return nil, err
}
duration = time.Since(start)
}
Expand Down
Loading
Loading