Skip to content

Commit

Permalink
update for compatibility with CockroachDB (#43)
Browse files Browse the repository at this point in the history
* update for compatibility with CockroachDB

* extend tests to run with Postgres 16 and 11
  • Loading branch information
rolang authored May 11, 2024
1 parent 8fd3dad commit 640ad6e
Show file tree
Hide file tree
Showing 23 changed files with 343 additions and 137 deletions.
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

![Logo](./docs/assets/logo.png)

Simple database migration tool for Scala + Postgres with [skunk](https://typelevel.org/skunk/) that can be deployed on JVM and Native.
Simple database migration tool for [Postgres](https://www.postgresql.org) with [skunk](https://typelevel.org/skunk/).
Usable via command-line or as library in your Scala project targeting JVM or Native (see [usage example](#usage-example)).
Supports a subset of [Flyway](https://flywaydb.org) features and keeps a Flyway compatible history state to allow you to switch to Flyway if necessary.
You might also be able to simply switch from Flyway to Dumbo without any changes in migration files or the history state, depending on used Flyway features.

Expand Down Expand Up @@ -58,6 +59,17 @@ key=value
executeInTransaction=false
```

⚠️⚠️
**NOTE**: Dumbo will attempt to execute each migration as a [simple query with multiple statements](https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-MULTI-STATEMENT)
in a transaction by default (unlike Flyway which may decide not to do so).
To disable it you need to set it explicitly using the configuration above.

Use with care and try to avoid where possible. Partially applied migrations will require manual intervention.
Dumbo is not going to update the history state in case of partial failures.
If you re-run the migration process it will attempt to execute the script the same way it did before it failed.
To fix it you'd need to roll back applied updates manually and then update the migration script and/or split it into multiple files before re-running the migration process.
⚠️⚠️

## Usage example

For usage via command line see [command-line](#command-line) section.
Expand Down
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,9 @@ lazy val skunkVersion = "1.0.0-M5"

lazy val epollcatVersion = "0.1.6"

lazy val munitVersion = "1.0.0-M11"
lazy val munitVersion = "1.0.0-RC1"

lazy val munitCEVersion = "2.0.0-M4"
lazy val munitCEVersion = "2.0.0-RC1"

lazy val core = crossProject(JVMPlatform, NativePlatform)
.crossType(CrossType.Full)
Expand Down Expand Up @@ -344,7 +344,7 @@ lazy val tests = crossProject(JVMPlatform, NativePlatform)
},
)

lazy val flywayVersion = "10.11.1"
lazy val flywayVersion = "10.12.0"
lazy val postgresqlVersion = "42.7.3"
lazy val testsFlyway = project
.in(file("modules/tests-flyway"))
Expand Down
38 changes: 32 additions & 6 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,39 @@
services:
pg_1:
image: postgres:15-alpine
pg_latest_1:
image: postgres:16-alpine
ports:
- "5432:5432"
environment:
POSTGRES_PASSWORD: postgres
pg_2:
image: postgres:15-alpine
POSTGRES_USER: root
POSTGRES_HOST_AUTH_METHOD: trust
pg_latest_2:
image: postgres:16-alpine
ports:
- "5433:5432"
environment:
POSTGRES_PASSWORD: postgres
POSTGRES_USER: root
POSTGRES_HOST_AUTH_METHOD: trust
pg_11_1:
image: postgres:11-alpine
ports:
- "5434:5432"
environment:
POSTGRES_USER: root
POSTGRES_HOST_AUTH_METHOD: trust
pg_11_2:
image: postgres:11-alpine
ports:
- "5435:5432"
environment:
POSTGRES_USER: root
POSTGRES_HOST_AUTH_METHOD: trust
cockroachdb_1:
image: cockroachdb/cockroach:v23.2.4
ports:
- "5436:26257"
command: start-single-node --insecure
cockroachdb_2:
image: cockroachdb/cockroach:v23.2.4
ports:
- "5437:26257"
command: start-single-node --insecure
99 changes: 61 additions & 38 deletions modules/core/shared/src/main/scala/dumbo/Dumbo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,40 +63,46 @@ final class DumboWithResourcesPartiallyApplied[F[_]](reader: ResourceReader[F])
schemas = schemas,
schemaHistoryTable = schemaHistoryTable,
validateOnMigrate = validateOnMigrate,
progressMonitor = Async[F].background {
Stream
.evalSeq(
sessionResource
.use(
_.execute(
sql"""SELECT ps.pid, ps.query_start, ps.state_change, ps.state, ps.wait_event_type, ps.wait_event, ps.query
progressMonitor = Resource
.eval(sessionResource.use(s => Dumbo.hasTableLockSupport(s, s"${defaultSchema}.${schemaHistoryTable}")))
.flatMap {
case false => Resource.eval(Console[F].println("Progress monitor is not supported for current database"))
case true =>
Async[F].background {
Stream
.evalSeq(
sessionResource
.use(
_.execute(
sql"""SELECT ps.pid, ps.query_start, ps.state_change, ps.state, ps.wait_event_type, ps.wait_event, ps.query
FROM pg_locks l
JOIN pg_stat_all_tables t ON t.relid = l.relation
JOIN pg_stat_activity ps ON ps.pid = l.pid
WHERE t.schemaname = '#${defaultSchema}' and t.relname = '#${schemaHistoryTable}'"""
.query(int4 *: timestamptz *: timestamptz *: text *: text.opt *: text.opt *: text)
).map(_.groupByNel { case pid *: _ => pid }.toList.map(_._2.head))
)
)
.evalMap { case pid *: start *: changed *: state *: eventType *: event *: query *: _ =>
for {
now <- Clock[F].realTimeInstant
startedAgo = now.getEpochSecond() - start.toEpochSecond()
changedAgo = now.getEpochSecond() - changed.toEpochSecond()
queryLogSize = 150
queryLog = query.take(queryLogSize) + (if (query.size > queryLogSize) "..." else "")
_ <-
Console[F].println(
s"Awaiting query with pid: $pid started: ${startedAgo}s ago (state: $state / last changed: ${changedAgo}s ago, " +
s"eventType: ${eventType.getOrElse("")}, event: ${event.getOrElse("")}):\n${queryLog}"
.query(int4 *: timestamptz *: timestamptz *: text *: text.opt *: text.opt *: text)
).map(_.groupByNel { case pid *: _ => pid }.toList.map(_._2.head))
)
)
} yield ()
}
.repeat
.metered(logMigrationStateAfter)
.compile
.drain
}.void,
.evalMap { case pid *: start *: changed *: state *: eventType *: event *: query *: _ =>
for {
now <- Clock[F].realTimeInstant
startedAgo = now.getEpochSecond() - start.toEpochSecond()
changedAgo = now.getEpochSecond() - changed.toEpochSecond()
queryLogSize = 150
queryLog = query.take(queryLogSize) + (if (query.size > queryLogSize) "..." else "")
_ <-
Console[F].println(
s"Awaiting query with pid: $pid started: ${startedAgo}s ago (state: $state / last changed: ${changedAgo}s ago, " +
s"eventType: ${eventType.getOrElse("")}, event: ${event.getOrElse("")}):\n${queryLog}"
)
} yield ()
}
.repeat
.metered(logMigrationStateAfter)
.compile
.drain
}.void
},
)
}

Expand All @@ -117,7 +123,7 @@ final class DumboWithResourcesPartiallyApplied[F[_]](reader: ResourceReader[F])
user = connection.user,
database = connection.database,
password = connection.password,
strategy = Typer.Strategy.SearchPath,
strategy = Typer.Strategy.BuiltinsOnly,
ssl = connection.ssl,
parameters = params,
)
Expand Down Expand Up @@ -201,6 +207,7 @@ class Dumbo[F[_]: Sync: Console](
private def migrateToNext(
session: Session[F],
fs: ResourceReader[F],
tableLockSupport: Boolean,
)(sourceFiles: List[ResourceFile]): F[Option[(HistoryEntry, List[ResourceFile])]] =
sourceFiles match {
case Nil => none.pure[F]
Expand All @@ -210,7 +217,8 @@ class Dumbo[F[_]: Sync: Console](
_ <- progressMonitor
} yield txn).use { _ =>
for {
_ <- session.execute(sql"LOCK TABLE #${historyTable} IN ACCESS EXCLUSIVE MODE".command)
_ <- if (tableLockSupport) lockTable(session, historyTable).void
else session.executeDiscard(sql"SELECT * FROM #${historyTable} FOR UPDATE".command)
latestInstalled <- session.unique(dumboHistory.findLatestInstalled).map(_.flatMap(_.sourceFileVersion))
result <- sourceFiles.dropWhile(s => latestInstalled.exists(s.version <= _)) match {
case head :: tail =>
Expand All @@ -232,7 +240,7 @@ class Dumbo[F[_]: Sync: Console](

// it's supposed to be prevented by IF NOT EXISTS clause when running concurrently
// but it doesn't always seem to prevent it, maybe better to lock another table instead of catching those?
// https://www.postgresql.org/docs/15/errcodes-appendix.html
// https://www.postgresql.org/docs/current/errcodes-appendix.html
private val duplicateErrorCodes = Set(
"42710", // duplicate_object
"23505", // unique_violation
Expand All @@ -242,6 +250,8 @@ class Dumbo[F[_]: Sync: Console](
def runMigration: F[MigrationResult] = sessionResource.use(migrateBySession)

private def migrateBySession(session: Session[F]): F[Dumbo.MigrationResult] = for {
dbVersion <- session.unique(sql"SELECT version()".query(text))
_ <- Console[F].println(s"Starting migration on $dbVersion")
schemaRes <-
allSchemas.toList
.flatTraverse(schema =>
Expand All @@ -254,6 +264,7 @@ class Dumbo[F[_]: Sync: Console](
_ <- session.execute(dumboHistory.createTableCommand).void.recover {
case e: skunk.exception.PostgresErrorException if duplicateErrorCodes.contains(e.code) => ()
}
tableLockSupport <- hasTableLockSupport(session, historyTable)
_ <- schemaRes match {
case e @ (_ :: _) => session.execute(dumboHistory.insertSchemaEntry)(e.mkString("\"", "\",\"", "\"")).void
case _ => ().pure[F]
Expand All @@ -272,11 +283,12 @@ class Dumbo[F[_]: Sync: Console](
Console[F].println(s"Found ${sourceFiles.size} versioned migration files$inLocation")
}
_ <- if (validateOnMigrate) validationGuard(session, sourceFiles) else ().pure[F]
migrationResult <- Stream
.unfoldEval(sourceFiles)(migrateToNext(session, resReader))
.compile
.toList
.map(Dumbo.MigrationResult(_))
migrationResult <-
Stream
.unfoldEval(sourceFiles)(migrateToNext(session, resReader, tableLockSupport))
.compile
.toList
.map(Dumbo.MigrationResult(_))
} yield migrationResult

_ <- migrationResult.migrations.sorted(Ordering[HistoryEntry].reverse).headOption match {
Expand Down Expand Up @@ -355,6 +367,17 @@ object Dumbo extends internal.DumboPlatform {
def withFilesIn[F[_]: Files](dir: Path): DumboWithResourcesPartiallyApplied[F] =
new DumboWithResourcesPartiallyApplied[F](ResourceReader.fileFs(dir))

private[dumbo] def hasTableLockSupport[F[_]: Sync](session: Session[F], table: String) =
session.transaction.use(_ =>
lockTable(session, table).attempt.map {
case Right(Completion.LockTable) => true
case _ => false
}
)

private def lockTable[F[_]](session: Session[F], table: String) =
session.execute(sql"LOCK TABLE #${table} IN ACCESS EXCLUSIVE MODE".command)

private[dumbo] def listMigrationFiles[F[_]: Sync](
fs: ResourceReader[F]
): F[ValidatedNec[DumboValidationException, List[ResourceFile]]] =
Expand All @@ -380,7 +403,7 @@ object Dumbo extends internal.DumboPlatform {
fs: ResourceReader[F]
): Stream[F, Either[String, ResourceFile]] =
fs.list
.filter(f => f.value.endsWith(".sql") || f.value.endsWith(".sql.conf"))
.filter(f => f.value.endsWith(".sql"))
.evalMap { path =>
val confPath = path.append(".conf")

Expand Down
8 changes: 4 additions & 4 deletions modules/core/shared/src/main/scala/dumbo/History.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,21 @@ object HistoryEntry {
.to[HistoryEntry]

val fieldNames =
"installed_rank, version, description, type, script, checksum, installed_by, installed_on, execution_time, success"
"installed_rank::INT4, version, description, type, script, checksum::INT4, installed_by, installed_on, execution_time::INT4, success"
}

class History(tableName: String) {
val createTableCommand: Command[Void] =
sql"""CREATE TABLE IF NOT EXISTS #${tableName} (
installed_rank INT NOT NULL PRIMARY KEY,
installed_rank INT4 NOT NULL PRIMARY KEY,
version VARCHAR(50) NULL,
description VARCHAR(200) NOT NULL,
type VARCHAR(20) NOT NULL,
script VARCHAR(1000) NOT NULL,
checksum INT NULL,
checksum INT4 NULL,
installed_by VARCHAR(100) NOT NULL,
installed_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
execution_time INT NOT NULL,
execution_time INT4 NOT NULL,
success BOOL NOT NULL
)""".command

Expand Down
4 changes: 2 additions & 2 deletions modules/example/src/main/scala/ExampleApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ object ExampleApp extends IOApp.Simple {
connection = ConnectionConfig(
host = "localhost",
port = 5432,
user = "postgres",
user = "root",
database = "postgres",
password = Some("postgres"),
password = None,
ssl = skunk.SSL.None, // skunk.SSL config, default is skunk.SSL.None
),
defaultSchema = "dumbo",
Expand Down
Loading

0 comments on commit 640ad6e

Please sign in to comment.