From e3b3b557acee3c5b3fc93ae1c6130739637f7374 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Tue, 17 Sep 2024 21:16:41 -0500 Subject: [PATCH 1/3] illustrate migrator issue with using uncommitted enum value --- .../commit_required/001_first.down.sql | 1 + .../commit_required/001_first.up.sql | 2 + .../commit_required/002_second.down.sql | 1 + .../commit_required/002_second.up.sql | 1 + .../commit_required/003_third.down.sql | 1 + .../commit_required/003_third.up.sql | 15 ++++++ rivermigrate/river_migrate_test.go | 46 +++++++++++++++++-- 7 files changed, 64 insertions(+), 3 deletions(-) create mode 100644 rivermigrate/migration/commit_required/001_first.down.sql create mode 100644 rivermigrate/migration/commit_required/001_first.up.sql create mode 100644 rivermigrate/migration/commit_required/002_second.down.sql create mode 100644 rivermigrate/migration/commit_required/002_second.up.sql create mode 100644 rivermigrate/migration/commit_required/003_third.down.sql create mode 100644 rivermigrate/migration/commit_required/003_third.up.sql diff --git a/rivermigrate/migration/commit_required/001_first.down.sql b/rivermigrate/migration/commit_required/001_first.down.sql new file mode 100644 index 00000000..3b21ed14 --- /dev/null +++ b/rivermigrate/migration/commit_required/001_first.down.sql @@ -0,0 +1 @@ +DROP TYPE foobar; diff --git a/rivermigrate/migration/commit_required/001_first.up.sql b/rivermigrate/migration/commit_required/001_first.up.sql new file mode 100644 index 00000000..4646c8ea --- /dev/null +++ b/rivermigrate/migration/commit_required/001_first.up.sql @@ -0,0 +1,2 @@ +-- create a foobar enum with values foo, bar: +CREATE TYPE foobar AS ENUM ('foo', 'bar'); diff --git a/rivermigrate/migration/commit_required/002_second.down.sql b/rivermigrate/migration/commit_required/002_second.down.sql new file mode 100644 index 00000000..8c3007ce --- /dev/null +++ b/rivermigrate/migration/commit_required/002_second.down.sql @@ -0,0 +1 @@ +-- not truly reversible, can't remove enum values. diff --git a/rivermigrate/migration/commit_required/002_second.up.sql b/rivermigrate/migration/commit_required/002_second.up.sql new file mode 100644 index 00000000..c1cd1631 --- /dev/null +++ b/rivermigrate/migration/commit_required/002_second.up.sql @@ -0,0 +1 @@ +ALTER TYPE foobar ADD VALUE 'baz' AFTER 'bar'; diff --git a/rivermigrate/migration/commit_required/003_third.down.sql b/rivermigrate/migration/commit_required/003_third.down.sql new file mode 100644 index 00000000..898efaf7 --- /dev/null +++ b/rivermigrate/migration/commit_required/003_third.down.sql @@ -0,0 +1 @@ +DROP FUNCTION foobar_in_bitmask; diff --git a/rivermigrate/migration/commit_required/003_third.up.sql b/rivermigrate/migration/commit_required/003_third.up.sql new file mode 100644 index 00000000..44e9a824 --- /dev/null +++ b/rivermigrate/migration/commit_required/003_third.up.sql @@ -0,0 +1,15 @@ +CREATE OR REPLACE FUNCTION foobar_in_bitmask(bitmask BIT(8), val foobar) +RETURNS boolean +LANGUAGE SQL +IMMUTABLE +AS $$ + SELECT CASE val + WHEN 'foo' THEN get_bit(bitmask, 7) + WHEN 'bar' THEN get_bit(bitmask, 6) + -- Because the enum value 'baz' was added in migration 2 and not part + -- of the original enum, we can't use it in an immutable SQL function + -- unless the new enum value migration has been committed. + WHEN 'baz' THEN get_bit(bitmask, 5) + ELSE 0 + END = 1; +$$; diff --git a/rivermigrate/river_migrate_test.go b/rivermigrate/river_migrate_test.go index b397d64f..3c6c752c 100644 --- a/rivermigrate/river_migrate_test.go +++ b/rivermigrate/river_migrate_test.go @@ -29,8 +29,10 @@ import ( const ( // The name of an actual migration line embedded in our test data below. - migrationLineAlternate = "alternate" - migrationLineAlternateMaxVersion = 6 + migrationLineAlternate = "alternate" + migrationLineAlternateMaxVersion = 6 + migrationLineCommitRequired = "commit_required" + migrationLineCommitRequiredMaxVersion = 3 ) //go:embed migration/*/*.sql @@ -50,12 +52,14 @@ func (d *driverWithAlternateLine) GetMigrationFS(line string) fs.FS { return migrationFS case migrationLineAlternate + "2": panic(line + " is only meant for testing line suggestions") + case migrationLineCommitRequired: + return migrationFS } panic("migration line does not exist: " + line) } func (d *driverWithAlternateLine) GetMigrationLines() []string { - return append(d.Driver.GetMigrationLines(), migrationLineAlternate, migrationLineAlternate+"2") + return append(d.Driver.GetMigrationLines(), migrationLineAlternate, migrationLineAlternate+"2", migrationLineCommitRequired) } func TestMigrator(t *testing.T) { @@ -720,6 +724,42 @@ func TestMigrator(t *testing.T) { _, err = bundle.tx.Exec(ctx, migrationsBundle.WithTestVersionsMap[5].SQLDown) require.NoError(t, err) }) + + t.Run("MigrationsWithCommitRequired", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t) + t.Cleanup(func() { + tx, err := bundle.dbPool.Begin(ctx) + require.NoError(t, err) + defer tx.Rollback(ctx) + + // Clean up the types we created. + _, err = tx.Exec(ctx, "DROP FUNCTION IF EXISTS foobar_in_bitmask") + require.NoError(t, err) + + _, err = tx.Exec(ctx, "DROP TYPE IF EXISTS foobar") + require.NoError(t, err) + + _, err = tx.Exec(ctx, "DELETE FROM river_migration WHERE line = $1", migrationLineCommitRequired) + require.NoError(t, err) + + require.NoError(t, tx.Commit(ctx)) + }) + + // We have to reinitialize the commitRequiredMigrator because the migrations + // bundle is set in the constructor. + commitRequiredMigrator, err := New(bundle.driver, &Config{ + Line: migrationLineCommitRequired, + Logger: bundle.logger, + }) + require.NoError(t, err) + + res, err := commitRequiredMigrator.Migrate(ctx, DirectionUp, nil) + require.NoError(t, err) + require.Equal(t, DirectionUp, res.Direction) + require.Equal(t, []int{1, 2, 3}, sliceutil.Map(res.Versions, migrateVersionToInt)) + }) } // This test uses a custom set of test-only migration files on the file system From 30d06916a220eb6c1db60ceeeef29c8cfc716335 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Tue, 17 Sep 2024 21:45:47 -0500 Subject: [PATCH 2/3] fix non-tx Migrate using a txn + commit for each migration --- rivermigrate/river_migrate.go | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/rivermigrate/river_migrate.go b/rivermigrate/river_migrate.go index 0e6702ce..4eb9df8d 100644 --- a/rivermigrate/river_migrate.go +++ b/rivermigrate/river_migrate.go @@ -297,16 +297,15 @@ func (m *Migrator[TTx]) GetVersion(version int) (Migration, error) { // // handle error // } func (m *Migrator[TTx]) Migrate(ctx context.Context, direction Direction, opts *MigrateOpts) (*MigrateResult, error) { - return dbutil.WithTxV(ctx, m.driver.GetExecutor(), func(ctx context.Context, exec riverdriver.ExecutorTx) (*MigrateResult, error) { - switch direction { - case DirectionDown: - return m.migrateDown(ctx, exec, direction, opts) - case DirectionUp: - return m.migrateUp(ctx, exec, direction, opts) - } + exec := m.driver.GetExecutor() + switch direction { + case DirectionDown: + return m.migrateDown(ctx, exec, direction, opts) + case DirectionUp: + return m.migrateUp(ctx, exec, direction, opts) + } - panic("invalid direction: " + direction) - }) + panic("invalid direction: " + direction) } // Migrate migrates the database in the given direction (up or down). The opts @@ -560,10 +559,22 @@ func (m *Migrator[TTx]) applyMigrations(ctx context.Context, exec riverdriver.Ex if !opts.DryRun { start := time.Now() - _, err := exec.Exec(ctx, sql) + + // Similar to ActiveRecord migrations, we wrap each individual migration + // in its own transaction. Without this, certain migrations that require + // a commit on a preexisting operation (such as adding an enum value to be + // used in an immutable function) cannot succeed. + err := dbutil.WithTx(ctx, exec, func(ctx context.Context, exec riverdriver.ExecutorTx) error { + _, err := exec.Exec(ctx, sql) + if err != nil { + return fmt.Errorf("error applying version %03d [%s]: %w", + versionBundle.Version, strings.ToUpper(string(direction)), err) + } + return nil + }) + if err != nil { - return nil, fmt.Errorf("error applying version %03d [%s]: %w", - versionBundle.Version, strings.ToUpper(string(direction)), err) + return nil, err } duration = time.Since(start) } From c0de0ae4206062f67298666dcf3765b0e7e8896c Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Wed, 18 Sep 2024 15:20:03 -0500 Subject: [PATCH 3/3] deprecate MigrateTx, convert tests to use schemas As detailed in #600, there are certain combinations of schema changes which are not allowed to be run within the same transaction. The example we encountered with #590 is adding a new enum value, then using it in an immutable function during a subsequent migration. In Postgres, these must be separated by a commit. There are other examples of things which cannot be run in a transaction, such as `CREATE INDEX CONCURRENTLY`. While that specific one isn't solved here, moving away from a migrator that bundles migrations into a single transaction will also allow us to update our migration system to exclude certain migrations from transactions and i.e. add indexes concurrently. --- CHANGELOG.md | 2 + .../example_migrate_database_sql_test.go | 23 +- rivermigrate/example_migrate_test.go | 32 +- rivermigrate/river_migrate.go | 4 +- rivermigrate/river_migrate_test.go | 289 +++++++++++------- rivershared/util/randutil/rand_util.go | 9 + 6 files changed, 217 insertions(+), 142 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index be82f73c..1edfca1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 } ``` +- **Deprecated**: The `MigrateTx` method of `rivermigrate` has been deprecated. It turns out there are certain combinations of schema changes which cannot be run within a single transaction, and the migrator now prefers to run each migration in its own transaction, one-at-a-time. `MigrateTx` will be removed in future version. + - The migrator now produces a better error in case of a non-existent migration line including suggestions for known migration lines that are similar in name to the invalid one. [PR #558](https://github.com/riverqueue/river/pull/558). ## Fixed diff --git a/rivermigrate/example_migrate_database_sql_test.go b/rivermigrate/example_migrate_database_sql_test.go index 9fd04cd7..7751971b 100644 --- a/rivermigrate/example_migrate_database_sql_test.go +++ b/rivermigrate/example_migrate_database_sql_test.go @@ -18,26 +18,27 @@ import ( func Example_migrateDatabaseSQL() { ctx := context.Background() - dbPool, err := sql.Open("pgx", riverinternaltest.DatabaseURL("river_test_example")) + // Use a dedicated Postgres schema for this example so we can migrate and drop it at will: + schemaName := "migration_example_dbsql" + url := riverinternaltest.DatabaseURL("river_test_example") + "&search_path=" + schemaName + dbPool, err := sql.Open("pgx", url) if err != nil { panic(err) } defer dbPool.Close() - tx, err := dbPool.BeginTx(ctx, nil) + driver := riverdatabasesql.New(dbPool) + migrator, err := rivermigrate.New(driver, nil) if err != nil { panic(err) } - defer tx.Rollback() - migrator, err := rivermigrate.New(riverdatabasesql.New(dbPool), nil) - if err != nil { + // Create the schema used for this example. Drop it when we're done. + // This isn't necessary outside this test. + if _, err := dbPool.ExecContext(ctx, "CREATE SCHEMA IF NOT EXISTS "+schemaName); err != nil { panic(err) } - - // Our test database starts with a full River schema. Drop it so that we can - // demonstrate working migrations. This isn't necessary outside this test. - dropRiverSchema(ctx, migrator, tx) + defer dropRiverSchema(ctx, driver, schemaName) printVersions := func(res *rivermigrate.MigrateResult) { for _, version := range res.Versions { @@ -47,7 +48,7 @@ func Example_migrateDatabaseSQL() { // Migrate to version 3. An actual call may want to omit all MigrateOpts, // which will default to applying all available up migrations. - res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{ + res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{ TargetVersion: 3, }) if err != nil { @@ -57,7 +58,7 @@ func Example_migrateDatabaseSQL() { // Migrate down by three steps. Down migrating defaults to running only one // step unless overridden by an option like MaxSteps or TargetVersion. - res, err = migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{ + res, err = migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{ MaxSteps: 3, }) if err != nil { diff --git a/rivermigrate/example_migrate_test.go b/rivermigrate/example_migrate_test.go index 47b572bb..b9cdf6b0 100644 --- a/rivermigrate/example_migrate_test.go +++ b/rivermigrate/example_migrate_test.go @@ -8,6 +8,7 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river/internal/riverinternaltest" + "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/rivermigrate" ) @@ -17,26 +18,29 @@ import ( func Example_migrate() { ctx := context.Background() - dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_test_example")) + // Use a dedicated Postgres schema for this example so we can migrate and drop it at will: + schemaName := "migration_example" + poolConfig := riverinternaltest.DatabaseConfig("river_test_example") + poolConfig.ConnConfig.RuntimeParams["search_path"] = schemaName + + dbPool, err := pgxpool.NewWithConfig(ctx, poolConfig) if err != nil { panic(err) } defer dbPool.Close() - tx, err := dbPool.Begin(ctx) + driver := riverpgxv5.New(dbPool) + migrator, err := rivermigrate.New(driver, nil) if err != nil { panic(err) } - defer tx.Rollback(ctx) - migrator, err := rivermigrate.New(riverpgxv5.New(dbPool), nil) - if err != nil { + // Create the schema used for this example. Drop it when we're done. + // This isn't necessary outside this test. + if _, err := dbPool.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS "+schemaName); err != nil { panic(err) } - - // Our test database starts with a full River schema. Drop it so that we can - // demonstrate working migrations. This isn't necessary outside this test. - dropRiverSchema(ctx, migrator, tx) + defer dropRiverSchema(ctx, driver, schemaName) printVersions := func(res *rivermigrate.MigrateResult) { for _, version := range res.Versions { @@ -46,7 +50,7 @@ func Example_migrate() { // Migrate to version 3. An actual call may want to omit all MigrateOpts, // which will default to applying all available up migrations. - res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{ + res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{ TargetVersion: 3, }) if err != nil { @@ -56,7 +60,7 @@ func Example_migrate() { // Migrate down by three steps. Down migrating defaults to running only one // step unless overridden by an option like MaxSteps or TargetVersion. - res, err = migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{ + res, err = migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{ MaxSteps: 3, }) if err != nil { @@ -73,10 +77,8 @@ func Example_migrate() { // Migrated [DOWN] version 1 } -func dropRiverSchema[TTx any](ctx context.Context, migrator *rivermigrate.Migrator[TTx], tx TTx) { - _, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{ - TargetVersion: -1, - }) +func dropRiverSchema[TTx any](ctx context.Context, driver riverdriver.Driver[TTx], schemaName string) { + _, err := driver.GetExecutor().Exec(ctx, "DROP SCHEMA IF EXISTS "+schemaName+" CASCADE;") if err != nil { panic(err) } diff --git a/rivermigrate/river_migrate.go b/rivermigrate/river_migrate.go index 4eb9df8d..e7e03aa5 100644 --- a/rivermigrate/river_migrate.go +++ b/rivermigrate/river_migrate.go @@ -326,6 +326,9 @@ func (m *Migrator[TTx]) Migrate(ctx context.Context, direction Direction, opts * // This variant lets a caller run migrations within a transaction. Postgres DDL // is transactional, so migration changes aren't visible until the transaction // commits, and are rolled back if the transaction rolls back. +// +// Deprecated: Use Migrate instead. Certain migrations cannot be batched together +// in a single transaction, so this method is not recommended. func (m *Migrator[TTx]) MigrateTx(ctx context.Context, tx TTx, direction Direction, opts *MigrateOpts) (*MigrateResult, error) { switch direction { case DirectionDown: @@ -572,7 +575,6 @@ func (m *Migrator[TTx]) applyMigrations(ctx context.Context, exec riverdriver.Ex } return nil }) - if err != nil { return nil, err } diff --git a/rivermigrate/river_migrate_test.go b/rivermigrate/river_migrate_test.go index 3c6c752c..2fb96dac 100644 --- a/rivermigrate/river_migrate_test.go +++ b/rivermigrate/river_migrate_test.go @@ -24,6 +24,7 @@ import ( "github.com/riverqueue/river/riverdriver/riverdatabasesql" "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/rivershared/riversharedtest" + "github.com/riverqueue/river/rivershared/util/randutil" "github.com/riverqueue/river/rivershared/util/sliceutil" ) @@ -73,31 +74,37 @@ func TestMigrator(t *testing.T) { dbPool *pgxpool.Pool driver *driverWithAlternateLine logger *slog.Logger - tx pgx.Tx } setup := func(t *testing.T) (*Migrator[pgx.Tx], *testBundle) { t.Helper() - // The test suite largely works fine with test transactions, but due to - // the invasive nature of changing schemas, it's quite easy to have test - // transactions deadlock with each other as they run in parallel. Here - // we use test DBs instead of test transactions, but this could be - // changed to test transactions as long as test cases were made to run - // non-parallel. - dbPool := riverinternaltest.TestDB(ctx, t) + // Not all migrations can be executed together in a single transaction. + // Examples include `CREATE INDEX CONCURRENTLY`, or adding an enum value + // that's used by a later migration. As such, the migrator and its tests + // must use a full database with commits between each migration. + // + // To make this easier to clean up afterward, we create a new, clean schema + // for each test run and then drop it afterward. + baseDBPool := riverinternaltest.TestDB(ctx, t) + schemaName := "river_migrate_test_" + randutil.Hex(8) + _, err := baseDBPool.Exec(ctx, "CREATE SCHEMA "+schemaName) + require.NoError(t, err) + + t.Cleanup(func() { + _, err := baseDBPool.Exec(ctx, fmt.Sprintf("DROP SCHEMA %s CASCADE", schemaName)) + require.NoError(t, err) + }) - // Despite being in an isolated database, we still start a transaction - // because we don't want schema changes we make to persist. - tx, err := dbPool.Begin(ctx) + newSchemaConfig := baseDBPool.Config() + newSchemaConfig.ConnConfig.RuntimeParams["search_path"] = schemaName + newSchemaPool, err := pgxpool.NewWithConfig(ctx, newSchemaConfig) require.NoError(t, err) - t.Cleanup(func() { _ = tx.Rollback(ctx) }) bundle := &testBundle{ - dbPool: dbPool, - driver: &driverWithAlternateLine{Driver: riverpgxv5.New(dbPool)}, + dbPool: newSchemaPool, + driver: &driverWithAlternateLine{Driver: riverpgxv5.New(newSchemaPool)}, logger: riversharedtest.Logger(t), - tx: tx, } migrator, err := New(bundle.driver, &Config{Logger: bundle.logger}) @@ -168,17 +175,37 @@ func TestMigrator(t *testing.T) { migrator, _ := setup(t) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion}) + require.NoError(t, err) + migrations, err := migrator.ExistingVersions(ctx) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, migrationToInt)) }) - t.Run("ExistingMigrationsTxDefault", func(t *testing.T) { + t.Run("ExistingMigrationsEmpty", func(t *testing.T) { + t.Parallel() + + migrator, _ := setup(t) + + migrations, err := migrator.ExistingVersions(ctx) + require.NoError(t, err) + require.Equal(t, []int{}, sliceutil.Map(migrations, migrationToInt)) + }) + + t.Run("ExistingMigrationsTxDefaultLine", func(t *testing.T) { t.Parallel() migrator, bundle := setup(t) - migrations, err := migrator.ExistingVersionsTx(ctx, bundle.tx) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion}) + require.NoError(t, err) + + tx, err := bundle.dbPool.Begin(ctx) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tx.Rollback(ctx)) }) + + migrations, err := migrator.ExistingVersionsTx(ctx, tx) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, migrationToInt)) }) @@ -188,10 +215,11 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: -1}) + tx, err := bundle.dbPool.Begin(ctx) require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tx.Rollback(ctx)) }) - migrations, err := migrator.ExistingVersionsTx(ctx, bundle.tx) + migrations, err := migrator.ExistingVersionsTx(ctx, tx) require.NoError(t, err) require.Equal(t, []int{}, sliceutil.Map(migrations, migrationToInt)) }) @@ -199,12 +227,12 @@ func TestMigrator(t *testing.T) { t.Run("ExistingMigrationsTxFullyMigrated", func(t *testing.T) { t.Parallel() - migrator, bundle := setup(t) + migrator, _ := setup(t) - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) - migrations, err := migrator.ExistingVersionsTx(ctx, bundle.tx) + migrations, err := migrator.ExistingVersions(ctx) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(migrations, migrationToInt)) }) @@ -219,19 +247,17 @@ func TestMigrator(t *testing.T) { // Run two initial times to get to the version before river_job is dropped. // Defaults to only running one step when moving in the down direction. - for i := migrationsBundle.MaxVersion; i > migrateVersionIncludingRiverJob; i-- { - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{}) - require.NoError(t, err) - require.Equal(t, DirectionDown, res.Direction) - require.Equal(t, []int{i}, sliceutil.Map(res.Versions, migrateVersionToInt)) + res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrateVersionIncludingRiverJob}) + require.NoError(t, err) + require.Equal(t, DirectionUp, res.Direction) + require.Equal(t, seqOneTo(migrateVersionIncludingRiverJob), sliceutil.Map(res.Versions, migrateVersionToInt)) - err = dbExecError(ctx, bundle.driver.UnwrapExecutor(bundle.tx), "SELECT * FROM river_job") - require.NoError(t, err) - } + err = dbExecError(ctx, bundle.driver.GetExecutor(), "SELECT * FROM river_job") + require.NoError(t, err) // Run once more to go down one more step { - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{}) + res, err := migrator.Migrate(ctx, DirectionDown, &MigrateOpts{}) require.NoError(t, err) require.Equal(t, DirectionDown, res.Direction) require.Equal(t, []int{migrateVersionIncludingRiverJob}, sliceutil.Map(res.Versions, migrateVersionToInt)) @@ -239,7 +265,7 @@ func TestMigrator(t *testing.T) { version := res.Versions[0] require.Equal(t, "initial schema", version.Name) - err = dbExecError(ctx, bundle.driver.UnwrapExecutor(bundle.tx), "SELECT * FROM river_job") + err = dbExecError(ctx, bundle.driver.GetExecutor(), "SELECT * FROM river_job") require.Error(t, err) } }) @@ -247,12 +273,12 @@ func TestMigrator(t *testing.T) { t.Run("MigrateDownAfterUp", func(t *testing.T) { t.Parallel() - migrator, bundle := setup(t) + migrator, _ := setup(t) - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{}) + res, err := migrator.Migrate(ctx, DirectionDown, &MigrateOpts{}) require.NoError(t, err) require.Equal(t, []int{migrationsBundle.WithTestVersionsMaxVersion}, sliceutil.Map(res.Versions, migrateVersionToInt)) }) @@ -262,20 +288,20 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{MaxSteps: 2}) + res, err := migrator.Migrate(ctx, DirectionDown, &MigrateOpts{MaxSteps: 2}) require.NoError(t, err) require.Equal(t, []int{migrationsBundle.WithTestVersionsMaxVersion, migrationsBundle.WithTestVersionsMaxVersion - 1}, sliceutil.Map(res.Versions, migrateVersionToInt)) - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion-2), sliceutil.Map(migrations, driverMigrationToInt)) - err = dbExecError(ctx, bundle.driver.UnwrapExecutor(bundle.tx), "SELECT name FROM test_table") + err = dbExecError(ctx, bundle.driver.GetExecutor(), "SELECT name FROM test_table") require.Error(t, err) }) @@ -284,6 +310,9 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrationsBundle.MaxVersion}) + require.NoError(t, err) + // We don't actually migrate anything (max steps = -1) because doing so // would mess with the test database, but this still runs most code to // check that the function generally works. @@ -291,7 +320,7 @@ func TestMigrator(t *testing.T) { require.NoError(t, err) require.Equal(t, []int{}, sliceutil.Map(res.Versions, migrateVersionToInt)) - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, driverMigrationToInt)) @@ -303,7 +332,10 @@ func TestMigrator(t *testing.T) { _, bundle := setup(t) migrator, tx := setupDatabaseSQLMigrator(t, bundle) - res, err := migrator.MigrateTx(ctx, tx, DirectionDown, &MigrateOpts{MaxSteps: 1}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrationsBundle.MaxVersion}) + require.NoError(t, err) + + res, err := migrator.Migrate(ctx, DirectionDown, &MigrateOpts{MaxSteps: 1}) require.NoError(t, err) require.Equal(t, []int{migrationsBundle.MaxVersion}, sliceutil.Map(res.Versions, migrateVersionToInt)) @@ -318,20 +350,20 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: 4}) + res, err := migrator.Migrate(ctx, DirectionDown, &MigrateOpts{TargetVersion: 4}) require.NoError(t, err) require.Equal(t, seqDownTo(migrationsBundle.WithTestVersionsMaxVersion, 5), sliceutil.Map(res.Versions, migrateVersionToInt)) - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetAllAssumingMain(ctx) + migrations, err := bundle.driver.GetExecutor().MigrationGetAllAssumingMain(ctx) require.NoError(t, err) require.Equal(t, seqOneTo(4), sliceutil.Map(migrations, driverMigrationToInt)) - err = dbExecError(ctx, bundle.driver.UnwrapExecutor(bundle.tx), "SELECT name FROM test_table") + err = dbExecError(ctx, bundle.driver.GetExecutor(), "SELECT name FROM test_table") require.Error(t, err) }) @@ -340,32 +372,32 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: -1}) + res, err := migrator.Migrate(ctx, DirectionDown, &MigrateOpts{TargetVersion: -1}) require.NoError(t, err) require.Equal(t, seqDownTo(migrationsBundle.WithTestVersionsMaxVersion, 1), sliceutil.Map(res.Versions, migrateVersionToInt)) - err = dbExecError(ctx, bundle.driver.UnwrapExecutor(bundle.tx), "SELECT name FROM river_migrate") + err = dbExecError(ctx, bundle.driver.GetExecutor(), "SELECT name FROM river_migrate") require.Error(t, err) }) t.Run("MigrateDownWithTargetVersionInvalid", func(t *testing.T) { t.Parallel() - migrator, bundle := setup(t) + migrator, _ := setup(t) // migration doesn't exist { - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion + 77}) + _, err := migrator.Migrate(ctx, DirectionDown, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion + 77}) require.EqualError(t, err, fmt.Sprintf("version %d is not a valid River migration version", migrationsBundle.MaxVersion+77)) } // migration exists but not one that's applied { - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion + 1}) + _, err := migrator.Migrate(ctx, DirectionDown, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion + 1}) require.EqualError(t, err, fmt.Sprintf("version %d is not in target list of valid migrations to apply", migrationsBundle.MaxVersion+1)) } }) @@ -375,17 +407,17 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{DryRun: true}) + res, err := migrator.Migrate(ctx, DirectionDown, &MigrateOpts{DryRun: true}) require.NoError(t, err) require.Equal(t, []int{migrationsBundle.WithTestVersionsMaxVersion}, sliceutil.Map(res.Versions, migrateVersionToInt)) // Migrate down returned a result above for a migration that was // removed, but because we're in a dry run, the database still shows // this version. - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(migrations, driverMigrationToInt)) @@ -412,9 +444,12 @@ func TestMigrator(t *testing.T) { t.Run("MigrateNilOpts", func(t *testing.T) { t.Parallel() - migrator, bundle := setup(t) + migrator, _ := setup(t) - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, nil) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrationsBundle.MaxVersion}) + require.NoError(t, err) + + res, err := migrator.Migrate(ctx, DirectionUp, nil) require.NoError(t, err) require.Equal(t, []int{migrationsBundle.MaxVersion + 1, migrationsBundle.MaxVersion + 2}, sliceutil.Map(res.Versions, migrateVersionToInt)) }) @@ -426,34 +461,34 @@ func TestMigrator(t *testing.T) { // Run an initial time { - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) require.Equal(t, DirectionUp, res.Direction) - require.Equal(t, []int{migrationsBundle.WithTestVersionsMaxVersion - 1, migrationsBundle.WithTestVersionsMaxVersion}, + require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(res.Versions, migrateVersionToInt)) - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(migrations, driverMigrationToInt)) - _, err = bundle.tx.Exec(ctx, "SELECT * FROM test_table") + _, err = bundle.dbPool.Exec(ctx, "SELECT * FROM test_table") require.NoError(t, err) } // Run once more to verify idempotency { - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) require.Equal(t, DirectionUp, res.Direction) require.Equal(t, []int{}, sliceutil.Map(res.Versions, migrateVersionToInt)) - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(migrations, driverMigrationToInt)) - _, err = bundle.tx.Exec(ctx, "SELECT * FROM test_table") + _, err = bundle.dbPool.Exec(ctx, "SELECT * FROM test_table") require.NoError(t, err) } }) @@ -463,18 +498,21 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{MaxSteps: 1}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrationsBundle.MaxVersion}) + require.NoError(t, err) + + res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: 1}) require.NoError(t, err) require.Equal(t, []int{migrationsBundle.WithTestVersionsMaxVersion - 1}, sliceutil.Map(res.Versions, migrateVersionToInt)) - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion-1), sliceutil.Map(migrations, driverMigrationToInt)) // Column `name` is only added in the second test version. - err = dbExecError(ctx, bundle.driver.UnwrapExecutor(bundle.tx), "SELECT name FROM test_table") + err = dbExecError(ctx, bundle.driver.GetExecutor(), "SELECT name FROM test_table") require.Error(t, err) var pgErr *pgconn.PgError @@ -487,14 +525,11 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) - // We don't actually migrate anything (max steps = -1) because doing so - // would mess with the test database, but this still runs most code to - // check that the function generally works. - res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: -1}) + res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrationsBundle.MaxVersion}) require.NoError(t, err) - require.Equal(t, []int{}, sliceutil.Map(res.Versions, migrateVersionToInt)) + require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(res.Versions, migrateVersionToInt)) - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, driverMigrationToInt)) @@ -506,7 +541,10 @@ func TestMigrator(t *testing.T) { _, bundle := setup(t) migrator, tx := setupDatabaseSQLMigrator(t, bundle) - res, err := migrator.MigrateTx(ctx, tx, DirectionUp, &MigrateOpts{MaxSteps: 1}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrationsBundle.MaxVersion}) + require.NoError(t, err) + + res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: 1}) require.NoError(t, err) require.Equal(t, []int{migrationsBundle.MaxVersion + 1}, sliceutil.Map(res.Versions, migrateVersionToInt)) @@ -521,12 +559,12 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion + 2}) + res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion + 2}) require.NoError(t, err) - require.Equal(t, []int{migrationsBundle.MaxVersion + 1, migrationsBundle.MaxVersion + 2}, + require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(res.Versions, migrateVersionToInt)) - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.MaxVersion+2), sliceutil.Map(migrations, driverMigrationToInt)) }) @@ -534,17 +572,20 @@ func TestMigrator(t *testing.T) { t.Run("MigrateUpWithTargetVersionInvalid", func(t *testing.T) { t.Parallel() - migrator, bundle := setup(t) + migrator, _ := setup(t) + + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) + require.NoError(t, err) // migration doesn't exist { - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{TargetVersion: 77}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{TargetVersion: 77}) require.EqualError(t, err, "version 77 is not a valid River migration version") } // migration exists but already applied { - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{TargetVersion: 3}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{TargetVersion: 3}) require.EqualError(t, err, "version 3 is not in target list of valid migrations to apply") } }) @@ -554,7 +595,10 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{DryRun: true}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrationsBundle.MaxVersion}) + require.NoError(t, err) + + res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{DryRun: true}) require.NoError(t, err) require.Equal(t, DirectionUp, res.Direction) require.Equal(t, []int{migrationsBundle.WithTestVersionsMaxVersion - 1, migrationsBundle.WithTestVersionsMaxVersion}, @@ -563,7 +607,7 @@ func TestMigrator(t *testing.T) { // Migrate up returned a result above for migrations that were applied, // but because we're in a dry run, the database still shows the test // migration versions not applied. - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, driverMigrationToInt)) @@ -572,13 +616,13 @@ func TestMigrator(t *testing.T) { t.Run("ValidateSuccess", func(t *testing.T) { t.Parallel() - migrator, bundle := setup(t) + migrator, _ := setup(t) // Migrate all the way up. - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) - res, err := migrator.ValidateTx(ctx, bundle.tx) + res, err := migrator.Validate(ctx) require.NoError(t, err) require.Equal(t, &ValidateResult{OK: true}, res) }) @@ -586,41 +630,52 @@ func TestMigrator(t *testing.T) { t.Run("ValidateUnappliedMigrations", func(t *testing.T) { t.Parallel() - migrator, bundle := setup(t) + migrator, _ := setup(t) + + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrationsBundle.MaxVersion}) + require.NoError(t, err) - res, err := migrator.ValidateTx(ctx, bundle.tx) + res, err := migrator.Validate(ctx) require.NoError(t, err) require.Equal(t, &ValidateResult{ Messages: []string{fmt.Sprintf("Unapplied migrations: [%d %d]", migrationsBundle.MaxVersion+1, migrationsBundle.MaxVersion+2)}, }, res) }) - t.Run("MigrateDownToZeroAndBackUp", func(t *testing.T) { + t.Run("MigrateUpThenDownToZeroAndBackUp", func(t *testing.T) { t.Parallel() migrator, bundle := setup(t) requireMigrationTableExists := func(expectedExists bool) { - migrationExists, err := bundle.driver.UnwrapExecutor(bundle.tx).TableExists(ctx, "river_migration") + migrationExists, err := bundle.driver.GetExecutor().TableExists(ctx, "river_migration") require.NoError(t, err) require.Equal(t, expectedExists, migrationExists) } + // We start off with a clean schema so it has no tables: + requireMigrationTableExists(false) + + res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion}) + require.NoError(t, err) + require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), + sliceutil.Map(res.Versions, migrateVersionToInt)) + requireMigrationTableExists(true) - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: -1}) + res, err = migrator.Migrate(ctx, DirectionDown, &MigrateOpts{TargetVersion: -1}) require.NoError(t, err) require.Equal(t, seqDownTo(migrationsBundle.MaxVersion, 1), sliceutil.Map(res.Versions, migrateVersionToInt)) requireMigrationTableExists(false) - res, err = migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + res, err = migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(res.Versions, migrateVersionToInt)) - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(migrations, driverMigrationToInt)) @@ -629,7 +684,11 @@ func TestMigrator(t *testing.T) { t.Run("AlternateLineUpAndDown", func(t *testing.T) { t.Parallel() - _, bundle := setup(t) + migrator, bundle := setup(t) + + // Run the main migration line all the way up. + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrationsBundle.MaxVersion}) + require.NoError(t, err) // We have to reinitialize the alternateMigrator because the migrations bundle is // set in the constructor. @@ -639,23 +698,23 @@ func TestMigrator(t *testing.T) { }) require.NoError(t, err) - res, err := alternateMigrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + res, err := alternateMigrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) require.Equal(t, seqOneTo(migrationLineAlternateMaxVersion), sliceutil.Map(res.Versions, migrateVersionToInt)) - migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, migrationLineAlternate) + migrations, err := bundle.driver.GetExecutor().MigrationGetByLine(ctx, migrationLineAlternate) require.NoError(t, err) require.Equal(t, seqOneTo(migrationLineAlternateMaxVersion), sliceutil.Map(migrations, driverMigrationToInt)) - res, err = alternateMigrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: -1}) + res, err = alternateMigrator.Migrate(ctx, DirectionDown, &MigrateOpts{TargetVersion: -1}) require.NoError(t, err) require.Equal(t, seqDownTo(migrationLineAlternateMaxVersion, 1), sliceutil.Map(res.Versions, migrateVersionToInt)) // The main migration line should not have been touched. - migrations, err = bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetByLine(ctx, riverdriver.MigrationLineMain) + migrations, err = bundle.driver.GetExecutor().MigrationGetByLine(ctx, riverdriver.MigrationLineMain) require.NoError(t, err) require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, driverMigrationToInt)) @@ -667,7 +726,7 @@ func TestMigrator(t *testing.T) { migrator, bundle := setup(t) // Main line to just before the `line` column was added. - _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: 4}) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{TargetVersion: 4}) require.NoError(t, err) alternateMigrator, err := New(bundle.driver, &Config{ @@ -677,58 +736,55 @@ func TestMigrator(t *testing.T) { require.NoError(t, err) // Alternate line not allowed because `river_job.line` doesn't exist. - _, err = alternateMigrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + _, err = alternateMigrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.EqualError(t, err, "can't add a non-main migration line until `river_migration.line` is raised; fully migrate the main migration line and try again") // Main line to zero. - _, err = migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: -1}) + _, err = migrator.Migrate(ctx, DirectionDown, &MigrateOpts{TargetVersion: -1}) require.NoError(t, err) // Alternate line not allowed because `river_job` doesn't exist. - _, err = alternateMigrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + _, err = alternateMigrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.EqualError(t, err, "can't add a non-main migration line until `river_migration` is raised; fully migrate the main migration line and try again") }) // Demonstrates that even when not using River's internal migration system, // version 005 is still able to run. + // + // This is special because it's the first time the table's changed since + // version 001. t.Run("Version005ToleratesRiverMigrateNotPresent", func(t *testing.T) { t.Parallel() migrator, bundle := setup(t) - // The migration version in which `line` is added to `river_migration`. - // This is special because it's the first time the table's changed since - // version 001. - const migrateVersionTarget = 5 - // Migrate down to version 004. - for i := migrationsBundle.MaxVersion; i > migrateVersionTarget-1; i-- { - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{}) - require.NoError(t, err) - require.Equal(t, DirectionDown, res.Direction) - require.Equal(t, []int{i}, sliceutil.Map(res.Versions, migrateVersionToInt)) - } + res, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: 4}) + require.NoError(t, err) + require.Equal(t, DirectionUp, res.Direction) + require.Equal(t, seqOneTo(4), sliceutil.Map(res.Versions, migrateVersionToInt)) // Drop `river_migration` table as if version 001 had never originally run. - _, err := bundle.tx.Exec(ctx, "DROP TABLE river_migration") + _, err = bundle.dbPool.Exec(ctx, "DROP TABLE river_migration") require.NoError(t, err) // Run version 005 to make sure it can tolerate the absence of // `river_migration`. Note that we have to run the version's SQL // directly because using the migrator will try to interact with // `river_migration`, which is no longer present. - _, err = bundle.tx.Exec(ctx, migrationsBundle.WithTestVersionsMap[5].SQLUp) + _, err = bundle.dbPool.Exec(ctx, migrationsBundle.WithTestVersionsMap[5].SQLUp) require.NoError(t, err) // And the version 005 down migration to verify the same. - _, err = bundle.tx.Exec(ctx, migrationsBundle.WithTestVersionsMap[5].SQLDown) + _, err = bundle.dbPool.Exec(ctx, migrationsBundle.WithTestVersionsMap[5].SQLDown) require.NoError(t, err) }) t.Run("MigrationsWithCommitRequired", func(t *testing.T) { t.Parallel() - _, bundle := setup(t) + migrator, bundle := setup(t) + t.Cleanup(func() { tx, err := bundle.dbPool.Begin(ctx) require.NoError(t, err) @@ -747,6 +803,9 @@ func TestMigrator(t *testing.T) { require.NoError(t, tx.Commit(ctx)) }) + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) + require.NoError(t, err) + // We have to reinitialize the commitRequiredMigrator because the migrations // bundle is set in the constructor. commitRequiredMigrator, err := New(bundle.driver, &Config{ diff --git a/rivershared/util/randutil/rand_util.go b/rivershared/util/randutil/rand_util.go index 5f4e9836..fe2928c5 100644 --- a/rivershared/util/randutil/rand_util.go +++ b/rivershared/util/randutil/rand_util.go @@ -3,6 +3,7 @@ package randutil import ( cryptorand "crypto/rand" "encoding/binary" + "encoding/hex" mathrand "math/rand" "sync" "time" @@ -28,6 +29,14 @@ func DurationBetween(rand *mathrand.Rand, lowerLimit, upperLimit time.Duration) return time.Duration(IntBetween(rand, int(lowerLimit), int(upperLimit))) } +func Hex(length int) string { + bytes := make([]byte, length) + if _, err := cryptorand.Read(bytes); err != nil { + panic(err) + } + return hex.EncodeToString(bytes) +} + // IntBetween generates a random number in the range of [lowerLimit, upperLimit). // // TODO: When we drop Go 1.21 support, switch to `math/rand/v2` and kill the