Skip to content

Commit

Permalink
update skunk to 1.0.0-M4, drop scala 2.12 support (#23)
Browse files Browse the repository at this point in the history
* update skunk to 1.0.0-M4, drop scala 2.12 support
  • Loading branch information
rolang authored Feb 7, 2024
1 parent 8f203fe commit 95454db
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 135 deletions.
18 changes: 2 additions & 16 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,14 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
scala: [3, 2.13, 2.12]
scala: [3, 2.13]
java: [temurin@21, temurin@17]
project: [rootJVM, rootNative]
exclude:
- scala: 2.13
java: temurin@17
- scala: 2.12
java: temurin@17
- project: rootNative
java: temurin@17
- project: rootNative
scala: 2.12
- project: rootNative
scala: 2.13
runs-on: ${{ matrix.os }}
Expand Down Expand Up @@ -209,16 +205,6 @@ jobs:
tar xf targets.tar
rm targets.tar
- name: Download target directories (2.12, rootJVM)
uses: actions/download-artifact@v4
with:
name: target-${{ matrix.os }}-${{ matrix.java }}-2.12-rootJVM

- name: Inflate target directories (2.12, rootJVM)
run: |
tar xf targets.tar
rm targets.tar
- name: Import signing key
if: env.PGP_SECRET != '' && env.PGP_PASSPHRASE == ''
env:
Expand Down Expand Up @@ -308,5 +294,5 @@ jobs:
- name: Submit Dependencies
uses: scalacenter/sbt-dependency-submission@v2
with:
modules-ignore: dumbo_3 dumbo_2.13 dumbo_2.12 testsflyway_3 testsflyway_2.13 testsflyway_2.12 dumbo_3 dumbo_2.13 dumbo_2.12 dumbo_3 dumbo_2.13 dumbo_2.12 tests_3 tests_2.13 tests_2.12 example_3 example_2.13 example_2.12 tests_native0.4_3
modules-ignore: dumbo_3 dumbo_2.13 testsflyway_3 testsflyway_2.13 dumbo_3 dumbo_2.13 dumbo_3 dumbo_2.13 tests_3 tests_2.13 example_3 example_2.13 tests_native0.4_3
configs-ignore: test scala-tool scala-doc-tool test-internal
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Dumbo

[![Sonatype Releases](https://img.shields.io/nexus/r/https/oss.sonatype.org/dev.rolang/dumbo_2.13.svg?label=Sonatype%20Release)](https://oss.sonatype.org/content/repositories/releases/dev/rolang/dumbo_2.13/)
[![Sonatype Snapshots](https://img.shields.io/nexus/s/https/oss.sonatype.org/dev.rolang/dumbo_2.13.svg?label=Sonatype%20Snapshot)](https://oss.sonatype.org/content/repositories/snapshots/dev/rolang/dumbo_2.13/)
[![Sonatype Releases](https://img.shields.io/nexus/r/https/oss.sonatype.org/dev.rolang/dumbo_3.svg?label=Sonatype%20Release)](https://oss.sonatype.org/content/repositories/releases/dev/rolang/dumbo_3/)
[![Sonatype Snapshots](https://img.shields.io/nexus/s/https/oss.sonatype.org/dev.rolang/dumbo_3.svg?label=Sonatype%20Snapshot)](https://oss.sonatype.org/content/repositories/snapshots/dev/rolang/dumbo_3/)

![Logo](./docs/assets/logo.png)

Expand Down Expand Up @@ -62,8 +62,14 @@ For usage via command line see [command-line](#command-line) section.

In a sbt project dumbo can be added like:

```scala
libraryDependencies += "dev.rolang" %% "dumbo" % "0.1.x"
```
libraryDependencies += "dev.rolang" %% "dumbo" % "0.0.7"

_For compatibility with skunk `0.6.x` / natchez / Scala 2.12.x use release series `0.0.x`_:

```scala
libraryDependencies += "dev.rolang" %% "dumbo" % "0.0.x"
```

To include snapshot releases, add snapshot resolver:
Expand Down
10 changes: 4 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import scala.scalanative.build.*

lazy val `scala-2.12` = "2.12.18"
lazy val `scala-2.13` = "2.13.12"
lazy val `scala-3` = "3.3.1"

ThisBuild / tlBaseVersion := "0.0"
ThisBuild / tlBaseVersion := "0.1"
ThisBuild / startYear := Some(2023)
ThisBuild / scalaVersion := `scala-3`
ThisBuild / crossScalaVersions := Seq(`scala-3`, `scala-2.13`, `scala-2.12`)
ThisBuild / crossScalaVersions := Seq(`scala-3`, `scala-2.13`)

ThisBuild / organization := "dev.rolang"
ThisBuild / licenses := Seq(License.MIT)
Expand Down Expand Up @@ -97,8 +96,7 @@ ThisBuild / githubWorkflowBuild += WorkflowStep.Sbt(
)

ThisBuild / githubWorkflowBuildMatrixExclusions ++= Seq(
MatrixExclude(Map("project" -> "rootNative", "scala" -> "2.12")),
MatrixExclude(Map("project" -> "rootNative", "scala" -> "2.13")),
MatrixExclude(Map("project" -> "rootNative", "scala" -> "2.13"))
)

addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt")
Expand Down Expand Up @@ -138,7 +136,7 @@ lazy val root = tlCrossRootProject
.aggregate(core, cli, tests, testsFlyway, example)
.settings(commonSettings)

lazy val skunkVersion = "0.6.3"
lazy val skunkVersion = "1.0.0-M4"

lazy val epollcatVersion = "0.1.6"

Expand Down
2 changes: 1 addition & 1 deletion modules/cli/shared/src/main/scala/dumbo/cli/Dumbo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package dumbo.cli

import cats.effect.IO
import cats.effect.ExitCode
import natchez.Trace.Implicits.noop
import org.typelevel.otel4s.trace.Tracer.Implicits.noop
import skunk.SSL
import cats.effect.std.Console
import dumbo.exception.DumboValidationException
Expand Down
8 changes: 4 additions & 4 deletions modules/core/shared/src/main/scala/dumbo/Dumbo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import dumbo.internal.{ResourceReader, Session as DumboSession, Statements}
import fs2.Stream
import fs2.io.file.*
import fs2.io.net.Network
import natchez.Trace
import org.typelevel.otel4s.trace.Tracer
import skunk.codec.all.*
import skunk.data.Completion
import skunk.implicits.*
Expand All @@ -34,7 +34,7 @@ final class DumboWithResourcesPartiallyApplied[F[_]](reader: ResourceReader[F])
schemas: Set[String] = Dumbo.defaults.schemas,
schemaHistoryTable: String = Dumbo.defaults.schemaHistoryTable,
validateOnMigrate: Boolean = Dumbo.defaults.validateOnMigrate,
)(implicit S: Sync[F], T: Temporal[F], C: Console[F], TRC: Trace[F], N: Network[F]) =
)(implicit S: Sync[F], T: Temporal[F], C: Console[F], TRC: Tracer[F], N: Network[F]) =
new Dumbo[F](
resReader = reader,
sessionResource = toSessionResource(connection, defaultSchema, schemas),
Expand All @@ -51,7 +51,7 @@ final class DumboWithResourcesPartiallyApplied[F[_]](reader: ResourceReader[F])
schemas: Set[String] = Dumbo.defaults.schemas,
schemaHistoryTable: String = Dumbo.defaults.schemaHistoryTable,
validateOnMigrate: Boolean = Dumbo.defaults.validateOnMigrate,
)(implicit A: Async[F], C: Console[F], TRC: Trace[F]): Dumbo[F] = {
)(implicit A: Async[F], C: Console[F], TRC: Tracer[F]): Dumbo[F] = {
implicit val network: Network[F] = Network.forAsync(A)
val sessionResource = toSessionResource(connection, defaultSchema, schemas)

Expand Down Expand Up @@ -107,7 +107,7 @@ final class DumboWithResourcesPartiallyApplied[F[_]](reader: ResourceReader[F])
connection: ConnectionConfig,
defaultSchema: String,
schemas: Set[String],
)(implicit T: Temporal[F], C: Console[F], TRC: Trace[F], N: Network[F]) = {
)(implicit T: Temporal[F], C: Console[F], TRC: Tracer[F], N: Network[F]) = {
val searchPath = (schemas + defaultSchema).mkString(",")
val params = SkunkSession.DefaultConnectionParameters ++ Map("search_path" -> searchPath)

Expand Down
138 changes: 81 additions & 57 deletions modules/core/shared/src/main/scala/dumbo/internal/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,22 @@ import cats.data.Kleisli
import cats.effect.*
import cats.effect.std.Console
import cats.syntax.all.*
import com.comcast.ip4s.{Hostname, Port, SocketAddress}
import dumbo.internal.net.Protocol
import fs2.Stream
import fs2.concurrent.Signal
import fs2.io.net.{Network, SocketGroup, SocketOption}
import fs2.{Pipe, Stream}
import natchez.Trace
import fs2.io.net.{Network, Socket, SocketGroup, SocketOption}
import org.typelevel.otel4s.trace.Tracer
import skunk.codec.all.bool
import skunk.data.{Identifier, *}
import skunk.net.SSLNegotiation
import skunk.net.protocol.{Describe, Parse}
import skunk.util.Typer.Strategy.{BuiltinsOnly, SearchPath}
import skunk.util.*
import skunk.{Session as SkunkSession, *}

// extension of skunk.Session to support any multi-query statements with discarded results
// this could be removed in the future if it can be made part of skunk
// this could be removed in the future if it can be made part of skunk: https://github.com/typelevel/skunk/pull/1023
private[dumbo] trait Session[F[_]] extends SkunkSession[F] {
def execute_(statement: Statement[skunk.Void]): F[Unit]
}
Expand All @@ -48,7 +50,7 @@ private[dumbo] object Session {
Recycler(_.execute(Command("RESET ALL", Origin.unknown, Void.codec)).as(true))
}

def single[F[_]: Temporal: Trace: Network: Console](
def single[F[_]: Temporal: Tracer: Network: Console](
host: String,
port: Int = 5432,
user: String,
Expand Down Expand Up @@ -77,7 +79,7 @@ private[dumbo] object Session {
queryCache,
parseCache,
readTimeout,
).apply(Trace[F])
).apply(Tracer[F])

def singleF[F[_]: Temporal: Network: Console](
host: String,
Expand All @@ -93,8 +95,8 @@ private[dumbo] object Session {
queryCache: Int = 1024,
parseCache: Int = 1024,
readTimeout: Duration = Duration.Inf,
): Trace[F] => Resource[F, Session[F]] =
Kleisli((_: Trace[F]) =>
): Tracer[F] => Resource[F, Session[F]] =
Kleisli((_: Tracer[F]) =>
pooledF(
host = host,
port = port,
Expand All @@ -111,7 +113,7 @@ private[dumbo] object Session {
parseCache = parseCache,
readTimeout = readTimeout,
)
).flatMap(f => Kleisli((T: Trace[F]) => f(T))).run
).flatMap(f => Kleisli((T: Tracer[F]) => f(T))).run

def pooledF[F[_]: Temporal: Network: Console](
host: String,
Expand All @@ -129,10 +131,10 @@ private[dumbo] object Session {
queryCache: Int = 1024,
parseCache: Int = 1024,
readTimeout: Duration = Duration.Inf,
): Resource[F, Trace[F] => Resource[F, Session[F]]] = {
): Resource[F, Tracer[F] => Resource[F, Session[F]]] = {

def session(socketGroup: SocketGroup[F], sslOp: Option[SSLNegotiation.Options[F]], cache: Describe.Cache[F])(
implicit T: Trace[F]
implicit T: Tracer[F]
): Resource[F, Session[F]] =
for {
pc <- Resource.eval(Parse.Cache.empty[F](parseCache))
Expand All @@ -159,11 +161,11 @@ private[dumbo] object Session {
for {
dc <- Resource.eval(Describe.Cache.empty[F](commandCache, queryCache))
sslOp <- ssl.toSSLNegotiationOptions(if (debug) logger.some else none)
pool <- Pool.ofF((T: Trace[F]) => session(Network[F], sslOp, dc)(T), max)(Session.Recyclers.full)
pool <- Pool.ofF((T: Tracer[F]) => session(Network[F], sslOp, dc)(T), max)(Session.Recyclers.full)
} yield pool
}

def fromSocketGroup[F[_]: Temporal: Trace: Console](
def fromSocketGroup[F[_]: Tracer: Console](
socketGroup: SocketGroup[F],
host: String,
port: Int = 5432,
Expand All @@ -178,51 +180,73 @@ private[dumbo] object Session {
describeCache: Describe.Cache[F],
parseCache: Parse.Cache[F],
readTimeout: Duration = Duration.Inf,
): Resource[F, Session[F]] =
redactionStrategy: RedactionStrategy = RedactionStrategy.OptIn,
)(implicit ev: Temporal[F]): Resource[F, Session[F]] = {
def fail[A](msg: String): Resource[F, A] =
Resource.eval(ev.raiseError(new Exception(msg)))

def sock: Resource[F, Socket[F]] =
(Hostname.fromString(host), Port.fromInt(port)) match {
case (Some(validHost), Some(validPort)) =>
socketGroup.client(SocketAddress(validHost, validPort), socketOptions)
case (None, _) => fail(s"""Hostname: "$host" is not syntactically valid.""")
case (_, None) => fail(s"Port: $port falls out of the allowed range.")
}

for {
namer <- Resource.eval(Namer[F])
proto <- Protocol.apply[F](
host,
port,
debug,
namer,
socketGroup,
socketOptions,
sslOptions,
describeCache,
parseCache,
readTimeout,
)
_ <- Resource.eval(proto.startup(user, database, password, parameters))
sess <- Resource.make(SkunkSession.fromProtocol(proto, namer, strategy))(_ => proto.cleanup)
} yield new Session[F] {
override def execute_(statement: Statement[Void]): F[Unit] = proto.execute_(statement)
override def parameters: Signal[F, Map[String, String]] = sess.parameters
def parameter(key: String): Stream[F, String] = sess.parameter(key)
def transactionStatus: Signal[F, TransactionStatus] = sess.transactionStatus
def execute[A](query: Query[Void, A]): F[List[A]] = sess.execute(query)
def execute[A, B](query: Query[A, B])(args: A): F[List[B]] = sess.execute(query)(args)
def unique[A](query: Query[Void, A]): F[A] = sess.unique(query)
def unique[A, B](query: Query[A, B])(args: A): F[B] = sess.unique(query)(args)
def option[A](query: Query[Void, A]): F[Option[A]] = sess.option(query)
def option[A, B](query: Query[A, B])(args: A): F[Option[B]] = sess.option(query)(args)
def stream[A, B](command: Query[A, B])(args: A, chunkSize: Int): Stream[F, B] =
sess.stream(command)(args, chunkSize)
def cursor[A, B](query: Query[A, B])(args: A): Resource[F, Cursor[F, B]] = sess.cursor(query)(args)
def execute(command: Command[Void]): F[Completion] = sess.execute(command)
def execute[A](command: Command[A])(args: A): F[Completion] = sess.execute(command)(args)
def prepare[A, B](query: Query[A, B]): F[PreparedQuery[F, A, B]] = sess.prepare(query)
def prepare[A](command: Command[A]): F[PreparedCommand[F, A]] = sess.prepare(command)
def pipe[A](command: Command[A]): Pipe[F, A, Completion] = sess.pipe(command)
def pipe[A, B](query: Query[A, B], chunkSize: Int): Pipe[F, A, B] = sess.pipe(query, chunkSize)
def channel(name: Identifier): Channel[F, String, String] = sess.channel(name)
def transaction[A]: Resource[F, Transaction[F]] = sess.transaction
def transaction[A](
isolationLevel: TransactionIsolationLevel,
accessMode: TransactionAccessMode,
): Resource[F, Transaction[F]] = sess.transaction(isolationLevel, accessMode)
def typer: Typer = sess.typer
def describeCache: Describe.Cache[F] = sess.describeCache
def parseCache: Parse.Cache[F] = sess.parseCache
proto <- Protocol[F](debug, namer, sock, sslOptions, describeCache, parseCache, readTimeout, redactionStrategy)
_ <- Resource.eval(proto.startup(user, database, password, parameters))
sess <- Resource.make(fromProtocol(proto, namer, strategy, redactionStrategy))(_ => proto.cleanup)
} yield sess
}

def fromProtocol[F[_]](
proto: Protocol[F],
namer: Namer[F],
strategy: Typer.Strategy,
redactionStrategy: RedactionStrategy,
)(implicit ev: MonadCancel[F, Throwable]): F[Session[F]] = {
val ft: F[Typer] =
strategy match {
case BuiltinsOnly => Typer.Static.pure[F]
case SearchPath => Typer.fromProtocol(proto)
}

ft.map { typ =>
new SkunkSession.Impl[F] with Session[F] {
override val typer: Typer = typ
override def execute_(statement: Statement[Void]): F[Unit] = proto.execute_(statement)
override def execute(command: Command[Void]): F[Completion] = proto.execute(command)
override def channel(name: Identifier): Channel[F, String, String] = Channel.fromNameAndProtocol(name, proto)
override def parameters: Signal[F, Map[String, String]] = proto.parameters
override def parameter(key: String): Stream[F, String] = parameters.discrete.map(_.get(key)).unNone.changes
override def transactionStatus: Signal[F, TransactionStatus] = proto.transactionStatus
override def execute[A](query: Query[Void, A]): F[List[A]] = proto.execute(query, typer)
override def unique[A](query: Query[Void, A]): F[A] =
execute(query).flatMap {
case a :: Nil => a.pure[F]
case Nil => ev.raiseError(new RuntimeException("Expected exactly one row, none returned."))
case _ => ev.raiseError(new RuntimeException("Expected exactly one row, more returned."))
}
override def option[A](query: Query[Void, A]): F[Option[A]] =
execute(query).flatMap {
case a :: Nil => a.some.pure[F]
case Nil => none[A].pure[F]
case _ => ev.raiseError(new RuntimeException("Expected at most one row, more returned."))
}
override def prepare[A, B](query: Query[A, B]): F[PreparedQuery[F, A, B]] =
proto.prepare(query, typer).map(PreparedQuery.fromProto(_, redactionStrategy))
override def prepare[A](command: Command[A]): F[PreparedCommand[F, A]] =
proto.prepare(command, typer).map(PreparedCommand.fromProto(_))
override def transaction[A]: Resource[F, Transaction[F]] = Transaction.fromSession(this, namer, none, none)
override def transaction[A](
isolationLevel: TransactionIsolationLevel,
accessMode: TransactionAccessMode,
): Resource[F, Transaction[F]] = Transaction.fromSession(this, namer, isolationLevel.some, accessMode.some)
override def describeCache: Describe.Cache[F] = proto.describeCache
override def parseCache: Parse.Cache[F] = proto.parseCache
}
}
}
}
Loading

0 comments on commit 95454db

Please sign in to comment.