Skip to content

Commit

Permalink
replace adhoc extensions of skunk query (#17)
Browse files Browse the repository at this point in the history
* replace adhoc extensions of skunk query

* fix query, add tests

* update multi-query statement support impl
  • Loading branch information
rolang authored Dec 24, 2023
1 parent 7b434f2 commit aa94290
Show file tree
Hide file tree
Showing 15 changed files with 561 additions and 378 deletions.
17 changes: 12 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,20 @@ The migration can be executed like:

```scala
import cats.effect.{IO, IOApp}
import dumbo.Dumbo
import dumbo.{ConnectionConfig, Dumbo}
import natchez.Trace.Implicits.noop

object ExampleApp extends IOApp.Simple {
override def run: IO[Unit] = Dumbo
.withResourcesIn[IO]("db/migration")
.apply(
sessionResource = skunk.Session.single[IO](
connection = ConnectionConfig(
host = "localhost",
port = 5432,
user = "postgres",
database = "postgres",
password = Some("postgres"),
ssl = skunk.SSL.None, // skunk.SSL config, default is skunk.SSL.None
),
defaultSchema = "public",
)
Expand All @@ -93,7 +94,6 @@ object ExampleApp extends IOApp.Simple {
IO.println(s"Migration completed with ${result.migrationsExecuted} migrations")
}
}

```

To run the example locally with docker and sbt, start a Postgres docker container:
Expand Down Expand Up @@ -146,8 +146,15 @@ val dumboWithResouces = Dumbo.withFilesIn[IO](

```scala
dumboWithResouces.apply(
// skunk session resource
sessionResource: Resource[F, Session[F]],
// connection config
connection: dumbo.ConnectionConfig = dumbo.ConnectionConfig(
host = "localhost",
port = 5432,
user = "postgres",
database = "postgres",
password = Some("postgres"),
ssl = skunk.SSL.None, // skunk.SSL config, default is skunk.SSL.None
),

// default schema (the history state is going to be stored under that schema)
defaultSchema: String = "public",
Expand Down
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ lazy val commonSettings = List(
},
Compile / scalacOptions ++= {
if (scalaVersion.value == `scala-3`)
Seq("-source:future", "-language:adhocExtensions")
Seq("-source:future")
else
Seq("-Xsource:3")
},
Expand Down Expand Up @@ -161,6 +161,7 @@ lazy val example = project
.dependsOn(core.jvm)
.settings(commonSettings)
.settings(
publish / skip := true,
Compile / run / fork := true,
publish / skip := true,
scalacOptions -= "-Xfatal-warnings",
)
14 changes: 14 additions & 0 deletions modules/core/shared/src/main/scala/dumbo/ConnectionConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) 2023 by Roman Langolf
// This software is licensed under the MIT License (MIT).
// For more information see LICENSE or https://opensource.org/licenses/MIT

package dumbo

final case class ConnectionConfig(
host: String,
port: Int = 5432,
user: String,
database: String,
password: Option[String] = None,
ssl: skunk.SSL = skunk.SSL.None,
)
102 changes: 64 additions & 38 deletions modules/core/shared/src/main/scala/dumbo/Dumbo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,47 @@ import cats.data.Validated.{Invalid, Valid}
import cats.data.{NonEmptyChain, NonEmptyList, ValidatedNec}
import cats.effect.kernel.Clock
import cats.effect.std.Console
import cats.effect.{Async, Resource, Sync}
import cats.effect.{Async, Resource, Sync, Temporal}
import cats.implicits.*
import dumbo.exception.DumboValidationException
import dumbo.internal.{ResourceReader, Statements}
import dumbo.internal.{ResourceReader, Session as DumboSession, Statements}
import fs2.Stream
import fs2.io.file.*
import skunk.*
import fs2.io.net.Network
import natchez.Trace
import skunk.codec.all.*
import skunk.data.Completion
import skunk.implicits.*
import skunk.util.Origin
import skunk.Command as SqlCommand
import skunk.util.{Origin, Typer}
import skunk.{Session as SkunkSession, *}

final class DumboWithResourcesPartiallyApplied[F[_]](reader: ResourceReader[F]) {
def apply(
sessionResource: Resource[F, Session[F]],
connection: ConnectionConfig,
defaultSchema: String = "public",
schemas: Set[String] = Set.empty[String],
schemaHistoryTable: String = "flyway_schema_history",
validateOnMigrate: Boolean = true,
)(implicit S: Sync[F], C: Console[F]) = new Dumbo[F](
resReader = reader,
sessionResource = sessionResource,
defaultSchema = defaultSchema,
schemas = schemas,
schemaHistoryTable = schemaHistoryTable,
validateOnMigrate = validateOnMigrate,
)
)(implicit S: Sync[F], T: Temporal[F], C: Console[F], TRC: Trace[F], N: Network[F]) =
new Dumbo[F](
resReader = reader,
sessionResource = toSessionResource(connection, defaultSchema, schemas),
defaultSchema = defaultSchema,
schemas = schemas,
schemaHistoryTable = schemaHistoryTable,
validateOnMigrate = validateOnMigrate,
)

def withMigrationStateLogAfter(logMigrationStateAfter: FiniteDuration)(
sessionResource: Resource[F, Session[F]],
connection: ConnectionConfig,
defaultSchema: String = "public",
schemas: Set[String] = Set.empty[String],
schemaHistoryTable: String = "flyway_schema_history",
validateOnMigrate: Boolean = true,
)(implicit A: Async[F], C: Console[F]): Dumbo[F] =
)(implicit A: Async[F], C: Console[F], TRC: Trace[F]): Dumbo[F] = {
implicit val network: Network[F] = Network.forAsync(A)
val sessionResource = toSessionResource(connection, defaultSchema, schemas)

new Dumbo[F](
resReader = reader,
sessionResource = sessionResource,
Expand Down Expand Up @@ -91,14 +96,35 @@ final class DumboWithResourcesPartiallyApplied[F[_]](reader: ResourceReader[F])
.drain
}.void,
)
}

def listMigrationFiles(implicit S: Sync[F]): F[ValidatedNec[DumboValidationException, List[ResourceFile]]] =
Dumbo.listMigrationFiles(reader)

private def toSessionResource(
connection: ConnectionConfig,
defaultSchema: String,
schemas: Set[String],
)(implicit T: Temporal[F], C: Console[F], TRC: Trace[F], N: Network[F]) = {
val searchPath = (schemas + defaultSchema).mkString(",")
val params = SkunkSession.DefaultConnectionParameters ++ Map("search_path" -> searchPath)

DumboSession.single[F](
host = connection.host,
port = connection.port,
user = connection.user,
database = connection.database,
password = connection.password,
strategy = Typer.Strategy.SearchPath,
ssl = connection.ssl,
parameters = params,
)
}
}

class Dumbo[F[_]: Sync: Console](
resReader: ResourceReader[F],
sessionResource: Resource[F, Session[F]],
sessionResource: Resource[F, DumboSession[F]],
defaultSchema: String = "public",
schemas: Set[String] = Set.empty,
schemaHistoryTable: String = "flyway_schema_history",
Expand All @@ -113,7 +139,7 @@ class Dumbo[F[_]: Sync: Console](

private def initSchemaCmd(schema: String) = sql"CREATE SCHEMA IF NOT EXISTS #${schema}".command

private def transact(source: ResourceFile, fs: ResourceReader[F], session: Session[F]): F[HistoryEntry.New] =
private def transact(source: ResourceFile, fs: ResourceReader[F], session: DumboSession[F]): F[HistoryEntry.New] =
for {
_ <-
Console[F].println(
Expand All @@ -132,16 +158,18 @@ class Dumbo[F[_]: Sync: Console](
if (source.executeInTransaction) Vector(sql) else Statements.intoSingleStatements(sql)
}

(duration, _) <- Sync[F].timed {
(duration, _) <- Clock[F].timed {
statements
.map(sql =>
SqlCommand(
sql = sql,
origin = Origin(source.path.toString, line = 0),
Void.codec,
)
.map(statementSql =>
new Statement[Void] {
override val sql: String = statementSql
override val origin: Origin = Origin(source.path.toString, line = 0)
override val encoder: Encoder[Void] = Void.codec
override val cacheKey: Statement.CacheKey =
Statement.CacheKey(statementSql, encoder.types, Nil)
}
)
.traverse_(session.execute(_))
.traverse_(session.execute_(_))
}
_ <- Console[F].println(s"Migration to version ${source.versionRaw} - ${source.scriptDescription} completed")
} yield HistoryEntry.New(
Expand All @@ -154,7 +182,7 @@ class Dumbo[F[_]: Sync: Console](
success = true,
)

private def validationGuard(session: Session[F], sourceFiles: List[ResourceFile]) =
private def validationGuard(session: DumboSession[F], sourceFiles: List[ResourceFile]) =
if (sourceFiles.nonEmpty) {
session
.execute(dumboHistory.loadAllQuery)
Expand All @@ -168,7 +196,7 @@ class Dumbo[F[_]: Sync: Console](
} else ().pure[F]

private def migrateToNext(
session: Session[F],
session: DumboSession[F],
fs: ResourceReader[F],
)(sourceFiles: List[ResourceFile]): F[Option[(HistoryEntry, List[ResourceFile])]] =
sourceFiles match {
Expand All @@ -184,17 +212,15 @@ class Dumbo[F[_]: Sync: Console](
result <- sourceFiles.dropWhile(s => latestInstalled.exists(s.version <= _)) match {
case head :: tail =>
// acquire a new session for non-transactional operation
val transactSession: Resource[F, Session[F]] =
val transactSession: Resource[F, DumboSession[F]] =
if (head.executeInTransaction) Resource.pure(session) else sessionResource

transactSession.use { ts =>
for {
_ <- ts.execute(sql"SET SEARCH_PATH = #${allSchemas.toList.mkString(",")}".command)
newEntry <- transact(head, fs, ts)
} yield newEntry
}.flatMap { newEntry =>
session.unique(dumboHistory.insertSQLEntry)(newEntry)
}.map((_, tail).some)
transactSession
.use(transact(head, fs, _))
.flatMap { newEntry =>
session.unique(dumboHistory.insertSQLEntry)(newEntry)
}
.map((_, tail).some)
case _ => none.pure[F]
}
} yield result
Expand All @@ -212,7 +238,7 @@ class Dumbo[F[_]: Sync: Console](

def runMigration: F[MigrationResult] = sessionResource.use(migrateBySession)

private def migrateBySession(session: Session[F]): F[Dumbo.MigrationResult] = for {
private def migrateBySession(session: DumboSession[F]): F[Dumbo.MigrationResult] = for {
schemaRes <-
allSchemas.toList
.flatTraverse(schema =>
Expand Down
Loading

0 comments on commit aa94290

Please sign in to comment.