diff --git a/README.md b/README.md index fcad07f3..7e4c3df5 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ redis4cats Redis client built on top of [Cats Effect](https://typelevel.org/cats-effect/), [Fs2](http://fs2.io/) and the async Java client [Lettuce](https://lettuce.io/). -> **NOTE**: Neither binary compatibility nor API stability are guaranteed between releases. +> **NOTE**: Neither binary compatibility nor API stability will be guaranteed until we reach `1.0.0`. `redis4cats` defines two types of API: an effect-based using [Cats Effect](https://typelevel.org/cats-effect/) and a stream-based using [Fs2](http://fs2.io/). diff --git a/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisClient.scala b/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisClient.scala index b8d91d07..216e1717 100644 --- a/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisClient.scala +++ b/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisClient.scala @@ -20,6 +20,7 @@ import cats.effect._ import cats.syntax.apply._ import cats.syntax.functor._ import dev.profunktor.redis4cats.effect.{ JRFuture, Log } +import dev.profunktor.redis4cats.effect.JRFuture._ import io.lettuce.core.{ RedisClient => JRedisClient, RedisURI => JRedisURI } sealed abstract case class RedisClient private (underlying: JRedisClient, uri: RedisURI) @@ -27,7 +28,8 @@ sealed abstract case class RedisClient private (underlying: JRedisClient, uri: R object RedisClient { private[redis4cats] def acquireAndRelease[F[_]: Concurrent: ContextShift: Log]( - uri: => RedisURI + uri: => RedisURI, + blocker: Blocker ): (F[RedisClient], RedisClient => F[Unit]) = { val acquire: F[RedisClient] = F.delay { val jClient: JRedisClient = JRedisClient.create(uri.underlying) @@ -36,19 +38,21 @@ object RedisClient { val release: RedisClient => F[Unit] = client => F.info(s"Releasing Redis connection: $uri") *> - JRFuture.fromCompletableFuture(F.delay(client.underlying.shutdownAsync())).void + JRFuture.fromCompletableFuture(F.delay(client.underlying.shutdownAsync()))(blocker).void (acquire, release) } - private[redis4cats] def acquireAndReleaseWithoutUri[F[_]: Concurrent: ContextShift: Log] - : F[(F[RedisClient], RedisClient => F[Unit])] = - F.delay(RedisURI.fromUnderlying(new JRedisURI())).map(acquireAndRelease(_)) + private[redis4cats] def acquireAndReleaseWithoutUri[F[_]: Concurrent: ContextShift: Log]( + blocker: Blocker + ): F[(F[RedisClient], RedisClient => F[Unit])] = + F.delay(RedisURI.fromUnderlying(new JRedisURI())).map(uri => acquireAndRelease(uri, blocker)) - def apply[F[_]: Concurrent: ContextShift: Log](uri: => RedisURI): Resource[F, RedisClient] = { - val (acquire, release) = acquireAndRelease(uri) - Resource.make(acquire)(release) - } + def apply[F[_]: Concurrent: ContextShift: Log](uri: => RedisURI): Resource[F, RedisClient] = + mkBlocker[F].flatMap { blocker => + val (acquire, release) = acquireAndRelease(uri, blocker) + Resource.make(acquire)(release) + } def fromUnderlyingWithUri(underlying: JRedisClient, uri: RedisURI): RedisClient = new RedisClient(underlying, uri) {} diff --git a/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisClusterClient.scala b/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisClusterClient.scala index 8b84dc2e..0d1c0c2c 100644 --- a/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisClusterClient.scala +++ b/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisClusterClient.scala @@ -18,18 +18,19 @@ package dev.profunktor.redis4cats.connection import cats.effect._ import cats.implicits._ +import dev.profunktor.redis4cats.JavaConversions._ import dev.profunktor.redis4cats.domain.NodeId import dev.profunktor.redis4cats.effect.{ JRFuture, Log } +import dev.profunktor.redis4cats.effect.JRFuture._ import io.lettuce.core.cluster.{ SlotHash, RedisClusterClient => JClusterClient } import io.lettuce.core.cluster.models.partitions.{ Partitions => JPartitions } -import dev.profunktor.redis4cats.JavaConversions._ - sealed abstract case class RedisClusterClient private (underlying: JClusterClient) object RedisClusterClient { private[redis4cats] def acquireAndRelease[F[_]: Concurrent: ContextShift: Log]( + blocker: Blocker, uri: RedisURI* ): (F[RedisClusterClient], RedisClusterClient => F[Unit]) = { @@ -41,7 +42,7 @@ object RedisClusterClient { val release: RedisClusterClient => F[Unit] = client => F.info(s"Releasing Redis Cluster client: ${client.underlying}") *> - JRFuture.fromCompletableFuture(F.delay(client.underlying.shutdownAsync())).void + JRFuture.fromCompletableFuture(F.delay(client.underlying.shutdownAsync()))(blocker).void (acquire, release) } @@ -49,10 +50,11 @@ object RedisClusterClient { private[redis4cats] def initializeClusterPartitions[F[_]: Sync](client: JClusterClient): F[Unit] = F.delay(client.getPartitions).void - def apply[F[_]: Concurrent: ContextShift: Log](uri: RedisURI*): Resource[F, RedisClusterClient] = { - val (acquire, release) = acquireAndRelease(uri: _*) - Resource.make(acquire)(release) - } + def apply[F[_]: Concurrent: ContextShift: Log](uri: RedisURI*): Resource[F, RedisClusterClient] = + mkBlocker[F].flatMap { blocker => + val (acquire, release) = acquireAndRelease(blocker, uri: _*) + Resource.make(acquire)(release) + } def fromUnderlying(underlying: JClusterClient): RedisClusterClient = new RedisClusterClient(underlying) {} diff --git a/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisConnection.scala b/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisConnection.scala index 486ff8e4..46d73d70 100644 --- a/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisConnection.scala +++ b/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisConnection.scala @@ -22,8 +22,10 @@ import dev.profunktor.redis4cats.domain.NodeId import dev.profunktor.redis4cats.effect.JRFuture import io.lettuce.core.api.StatefulRedisConnection import io.lettuce.core.api.async.RedisAsyncCommands +import io.lettuce.core.api.sync.{ RedisCommands => RedisSyncCommands } import io.lettuce.core.cluster.api.StatefulRedisClusterConnection import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands +import io.lettuce.core.cluster.api.sync.{ RedisClusterCommands => RedisClusterSyncCommands } import scala.util.control.NoStackTrace case class OperationNotSupported(value: String) extends NoStackTrace { @@ -31,6 +33,8 @@ case class OperationNotSupported(value: String) extends NoStackTrace { } private[redis4cats] trait RedisConnection[F[_], K, V] { + def sync: F[RedisSyncCommands[K, V]] + def clusterSync: F[RedisClusterSyncCommands[K, V]] def async: F[RedisAsyncCommands[K, V]] def clusterAsync: F[RedisClusterAsyncCommands[K, V]] def close: F[Unit] @@ -39,31 +43,42 @@ private[redis4cats] trait RedisConnection[F[_], K, V] { } private[redis4cats] class RedisStatefulConnection[F[_]: Concurrent: ContextShift, K, V]( - conn: StatefulRedisConnection[K, V] + conn: StatefulRedisConnection[K, V], + blocker: Blocker ) extends RedisConnection[F, K, V] { + def sync: F[RedisSyncCommands[K, V]] = F.delay(conn.sync()) + def clusterSync: F[RedisClusterSyncCommands[K, V]] = + F.raiseError(OperationNotSupported("Running in a single node")) def async: F[RedisAsyncCommands[K, V]] = F.delay(conn.async()) def clusterAsync: F[RedisClusterAsyncCommands[K, V]] = F.raiseError(OperationNotSupported("Running in a single node")) - def close: F[Unit] = JRFuture.fromCompletableFuture(F.delay(conn.closeAsync())).void + def close: F[Unit] = JRFuture.fromCompletableFuture(F.delay(conn.closeAsync()))(blocker).void def byNode(nodeId: NodeId): F[RedisAsyncCommands[K, V]] = F.raiseError(OperationNotSupported("Running in a single node")) - def liftK[G[_]: Concurrent: ContextShift]: RedisConnection[G, K, V] = new RedisStatefulConnection[G, K, V](conn) + def liftK[G[_]: Concurrent: ContextShift]: RedisConnection[G, K, V] = + new RedisStatefulConnection[G, K, V](conn, blocker) } private[redis4cats] class RedisStatefulClusterConnection[F[_]: Concurrent: ContextShift, K, V]( - conn: StatefulRedisClusterConnection[K, V] + conn: StatefulRedisClusterConnection[K, V], + blocker: Blocker ) extends RedisConnection[F, K, V] { + def sync: F[RedisSyncCommands[K, V]] = + F.raiseError( + OperationNotSupported("Transactions are not supported in a cluster. You must select a single node.") + ) def async: F[RedisAsyncCommands[K, V]] = F.raiseError( OperationNotSupported("Transactions are not supported in a cluster. You must select a single node.") ) def clusterAsync: F[RedisClusterAsyncCommands[K, V]] = F.delay(conn.async()) + def clusterSync: F[RedisClusterSyncCommands[K, V]] = F.delay(conn.sync()) def close: F[Unit] = - JRFuture.fromCompletableFuture(F.delay(conn.closeAsync())).void + JRFuture.fromCompletableFuture(F.delay(conn.closeAsync()))(blocker).void def byNode(nodeId: NodeId): F[RedisAsyncCommands[K, V]] = - JRFuture.fromCompletableFuture(F.delay(conn.getConnectionAsync(nodeId.value))).flatMap { stateful => + JRFuture.fromCompletableFuture(F.delay(conn.getConnectionAsync(nodeId.value)))(blocker).flatMap { stateful => F.delay(stateful.async()) } def liftK[G[_]: Concurrent: ContextShift]: RedisConnection[G, K, V] = - new RedisStatefulClusterConnection[G, K, V](conn) + new RedisStatefulClusterConnection[G, K, V](conn, blocker) } diff --git a/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisMasterReplica.scala b/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisMasterReplica.scala index d52da77b..8e7a73b5 100644 --- a/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisMasterReplica.scala +++ b/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisMasterReplica.scala @@ -18,13 +18,13 @@ package dev.profunktor.redis4cats.connection import cats.effect._ import cats.syntax.all._ +import dev.profunktor.redis4cats.JavaConversions._ import dev.profunktor.redis4cats.domain._ import dev.profunktor.redis4cats.effect.{ JRFuture, Log } +import dev.profunktor.redis4cats.effect.JRFuture._ import io.lettuce.core.masterreplica.{ MasterReplica, StatefulRedisMasterReplicaConnection } import io.lettuce.core.{ ReadFrom => JReadFrom } -import dev.profunktor.redis4cats.JavaConversions._ - sealed abstract case class RedisMasterReplica[K, V] private (underlying: StatefulRedisMasterReplicaConnection[K, V]) object RedisMasterReplica { @@ -33,6 +33,7 @@ object RedisMasterReplica { client: RedisClient, codec: RedisCodec[K, V], readFrom: Option[JReadFrom], + blocker: Blocker, uris: RedisURI* ): (F[RedisMasterReplica[K, V]], RedisMasterReplica[K, V] => F[Unit]) = { @@ -40,11 +41,11 @@ object RedisMasterReplica { val connection: F[RedisMasterReplica[K, V]] = JRFuture - .fromCompletableFuture[F, StatefulRedisMasterReplicaConnection[K, V]] { + .fromCompletableFuture[F, StatefulRedisMasterReplicaConnection[K, V]]( F.delay { MasterReplica.connectAsync[K, V](client.underlying, codec.underlying, uris.map(_.underlying).asJava) } - } + )(blocker) .map(new RedisMasterReplica(_) {}) readFrom.fold(connection)(rf => connection.flatMap(c => F.delay(c.underlying.setReadFrom(rf)) *> c.pure[F])) @@ -52,7 +53,7 @@ object RedisMasterReplica { val release: RedisMasterReplica[K, V] => F[Unit] = connection => F.info(s"Releasing Redis Master/Replica connection: ${connection.underlying}") *> - JRFuture.fromCompletableFuture(F.delay(connection.underlying.closeAsync())).void + JRFuture.fromCompletableFuture(F.delay(connection.underlying.closeAsync()))(blocker).void (acquire, release) } @@ -61,12 +62,14 @@ object RedisMasterReplica { codec: RedisCodec[K, V], uris: RedisURI* )(readFrom: Option[JReadFrom] = None): Resource[F, RedisMasterReplica[K, V]] = - Resource.liftF(RedisClient.acquireAndReleaseWithoutUri[F]).flatMap { - case (acquireClient, releaseClient) => - Resource.make(acquireClient)(releaseClient).flatMap { client => - val (acquire, release) = acquireAndRelease(client, codec, readFrom, uris: _*) - Resource.make(acquire)(release) - } + mkBlocker[F].flatMap { blocker => + Resource.liftF(RedisClient.acquireAndReleaseWithoutUri[F](blocker)).flatMap { + case (acquireClient, releaseClient) => + Resource.make(acquireClient)(releaseClient).flatMap { client => + val (acquire, release) = acquireAndRelease(client, codec, readFrom, blocker, uris: _*) + Resource.make(acquire)(release) + } + } } def fromUnderlying[K, V](underlying: StatefulRedisMasterReplicaConnection[K, V]) = diff --git a/modules/core/src/main/scala/dev/profunktor/redis4cats/effect/JRFuture.scala b/modules/core/src/main/scala/dev/profunktor/redis4cats/effect/JRFuture.scala index 2e3f9f89..0a4685bc 100644 --- a/modules/core/src/main/scala/dev/profunktor/redis4cats/effect/JRFuture.scala +++ b/modules/core/src/main/scala/dev/profunktor/redis4cats/effect/JRFuture.scala @@ -16,40 +16,77 @@ package dev.profunktor.redis4cats.effect -import java.util.concurrent.{ CompletableFuture, CompletionStage, Future } - -import cats.effect.{ Async, ContextShift } -import cats.implicits._ +import cats.effect._ import cats.effect.implicits._ +import cats.implicits._ import io.lettuce.core.{ ConnectionFuture, RedisFuture } +import java.util.concurrent._ + +trait RedisBlocker { + def ec: Blocker +} + +object RedisBlocker { + def apply(blocker: Blocker): RedisBlocker = + new RedisBlocker { + def ec: Blocker = blocker + } +} object JRFuture { private[redis4cats] type JFuture[A] = CompletionStage[A] with Future[A] - def apply[F[_]: Async: ContextShift, A](fa: F[RedisFuture[A]]): F[A] = - liftJFuture[F, RedisFuture[A], A](fa) + private[redis4cats] def mkBlocker[F[_]: Sync]: Resource[F, Blocker] = + Blocker.fromExecutorService(F.delay(Executors.newFixedThreadPool(1))) + + def apply[F[_]: Concurrent: ContextShift, A]( + fa: F[RedisFuture[A]] + )(blocker: Blocker): F[A] = + liftJFuture[F, RedisFuture[A], A](fa)(blocker) + + def fromConnectionFuture[F[_]: Concurrent: ContextShift, A]( + fa: F[ConnectionFuture[A]] + )(blocker: Blocker): F[A] = + liftJFuture[F, ConnectionFuture[A], A](fa)(blocker) - def fromConnectionFuture[F[_]: Async: ContextShift, A](fa: F[ConnectionFuture[A]]): F[A] = - liftJFuture[F, ConnectionFuture[A], A](fa) + def fromCompletableFuture[F[_]: Concurrent: ContextShift, A]( + fa: F[CompletableFuture[A]] + )(blocker: Blocker): F[A] = + liftJFuture[F, CompletableFuture[A], A](fa)(blocker) - def fromCompletableFuture[F[_]: Async: ContextShift, A](fa: F[CompletableFuture[A]]): F[A] = - liftJFuture[F, CompletableFuture[A], A](fa) + implicit class FutureLiftOps[F[_]: Concurrent: ContextShift, A](fa: F[RedisFuture[A]]) { + def futureLift(implicit rb: RedisBlocker): F[A] = + liftJFuture[F, RedisFuture[A], A](fa)(rb.ec) + } private[redis4cats] def liftJFuture[ - F[_]: Async: ContextShift, + F[_]: Concurrent: ContextShift, G <: JFuture[A], A - ](fa: F[G]): F[A] = - fa.flatMap { f => - F.async[A] { cb => - f.handle[Unit] { (value: A, t: Throwable) => - if (t != null) cb(Left(t)) - else cb(Right(value)) + ](fa: F[G])(blocker: Blocker): F[A] = { + val lifted: F[A] = blocker.blockOn { + fa.flatMap { f => + blocker.blockOn { + F.cancelable { cb => + f.handle[Unit] { (res: A, err: Throwable) => + err match { + case null => + cb(Right(res)) + case _: CancellationException => + () + case ex: CompletionException if ex.getCause ne null => + cb(Left(ex.getCause)) + case ex => + cb(Left(ex)) + } + } + blocker.delay(f.cancel(true)).void } - () } - .guarantee(F.shift) + } } + lifted.guarantee(F.shift) + } } diff --git a/modules/core/src/main/scala/dev/profunktor/redis4cats/hlist.scala b/modules/core/src/main/scala/dev/profunktor/redis4cats/hlist.scala new file mode 100644 index 00000000..af8a9a9d --- /dev/null +++ b/modules/core/src/main/scala/dev/profunktor/redis4cats/hlist.scala @@ -0,0 +1,70 @@ +/* + * Copyright 2018-2020 ProfunKtor + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.profunktor.redis4cats + +/** + * An heterogeneous list, mainly used to operate on transactions. + * + * Highly inspired by Shapeless machinery but very much lightweight. + */ +object hlist { + + type ::[H, T <: HList] = HCons[H, T] + type HNil = HNil.type + + sealed trait HList { + def ::[A](a: A): HCons[A, this.type] = HCons(a, this) + } + + final case class HCons[+H, +Tail <: HList](head: H, tail: Tail) extends HList + case object HNil extends HList + + object ~: { + def unapply[H, T <: HList](l: H :: T): Some[(H, T)] = Some((l.head, l.tail)) + } + + /** + * It witnesses a relationship between two HLists. + * + * The existing instances model a relationship between an HList comformed + * of actions F[A] and results A. E.g.: + * + * {{{ + * val actions: IO[Unit] :: IO[String] :: HNil = IO.unit :: IO.pure("hi") :: HNil + * val results: actions.R = () :: "hi" :: HNil + * }}} + * + * A Witness[IO[Unit] :: IO[String] :: HNil] proves that its result type can + * only be Unit :: String :: HNil. + * + * A Witness is sealed to avoid the creation of invalid instances. + */ + sealed trait Witness[T <: HList] { + type R <: HList + } + + object Witness { + type Aux[T0 <: HList, R0 <: HList] = Witness[T0] { type R = R0 } + + implicit val hnil: Witness.Aux[HNil, HNil] = + new Witness[HNil] { type R = HNil } + + implicit def hcons[F[_], A, T <: HList](implicit w: Witness[T]): Witness.Aux[HCons[F[A], T], HCons[A, w.R]] = + new Witness[HCons[F[A], T]] { type R = HCons[A, w.R] } + } + +} diff --git a/modules/core/src/test/scala/dev/profunktor/redis4cats/HListSpec.scala b/modules/core/src/test/scala/dev/profunktor/redis4cats/HListSpec.scala new file mode 100644 index 00000000..4f91669f --- /dev/null +++ b/modules/core/src/test/scala/dev/profunktor/redis4cats/HListSpec.scala @@ -0,0 +1,50 @@ +/* + * Copyright 2018-2020 ProfunKtor + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.profunktor.redis4cats + +import cats.effect.IO +import hlist._ +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +class HListSpec extends AnyFunSuite with Matchers { + + test("HList and Witness") { + def proof[T <: HList, R <: HList](xs: T)(implicit w: Witness.Aux[T, R]): R = + xs.asInstanceOf[w.R] // can return anything, we only care about the types here + + val actions = IO.unit :: IO.pure("hi") :: HNil + + proof(actions): Unit :: String :: HNil + + "proof(actions): Unit :: Int :: HNil" shouldNot typeCheck + } + + test("Unapply HLists (deconstruct)") { + val hl = () :: "hi" :: 123 :: true :: 's' :: 55 :: HNil + + val u ~: s ~: n1 ~: b ~: c ~: n2 ~: HNil = hl + + assert(u.isInstanceOf[Unit]) + assert(s.isInstanceOf[String]) + assert(n1.isInstanceOf[Int]) + assert(b.isInstanceOf[Boolean]) + assert(c.isInstanceOf[Char]) + assert(n2.isInstanceOf[Int]) + } + +} diff --git a/modules/core/src/test/scala/dev/profunktor/redis4cats/effect/JRFutureSpec.scala b/modules/core/src/test/scala/dev/profunktor/redis4cats/effect/JRFutureSpec.scala index 54863648..204b349d 100644 --- a/modules/core/src/test/scala/dev/profunktor/redis4cats/effect/JRFutureSpec.scala +++ b/modules/core/src/test/scala/dev/profunktor/redis4cats/effect/JRFutureSpec.scala @@ -18,24 +18,27 @@ package dev.profunktor.redis4cats.effect import java.util.concurrent.CompletableFuture -import cats.effect.{ ContextShift, IO } +import cats.effect.{ Blocker, ContextShift, IO } import cats.syntax.all._ import dev.profunktor.redis4cats.testutils.Redis4CatsAsyncFunSuite +import scala.concurrent.ExecutionContext class JRFutureSpec extends Redis4CatsAsyncFunSuite { - implicit val cs: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global) + implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global) val currentThread: IO[String] = IO(Thread.currentThread().getName) test("it shifts back once the Future is converted") { val ioa = - JRFuture.fromCompletableFuture[IO, String] { - IO { - val jFuture = new CompletableFuture[String]() - jFuture.complete("foo") - jFuture - } + Blocker[IO].use { blocker => + JRFuture.fromCompletableFuture[IO, String] { + IO { + val jFuture = new CompletableFuture[String]() + jFuture.complete("foo") + jFuture + } + }(blocker) } (ioa *> currentThread) @@ -45,12 +48,14 @@ class JRFutureSpec extends Redis4CatsAsyncFunSuite { test("it shifts back even when the CompletableFuture fails") { val ioa = - JRFuture.fromCompletableFuture[IO, String] { - IO { - val jFuture = new CompletableFuture[String]() - jFuture.completeExceptionally(new RuntimeException("Purposely fail")) - jFuture - } + Blocker[IO].use { blocker => + JRFuture.fromCompletableFuture[IO, String] { + IO { + val jFuture = new CompletableFuture[String]() + jFuture.completeExceptionally(new RuntimeException("Purposely fail")) + jFuture + } + }(blocker) } (ioa.attempt *> currentThread) diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/interpreter/Redis.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/interpreter/Redis.scala index 71c445ea..3bfcec1c 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/interpreter/Redis.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/interpreter/Redis.scala @@ -23,26 +23,40 @@ import cats.implicits._ import dev.profunktor.redis4cats.algebra._ import dev.profunktor.redis4cats.connection._ import dev.profunktor.redis4cats.domain._ -import dev.profunktor.redis4cats.effect.{JRFuture, Log} +import dev.profunktor.redis4cats.effect.{ JRFuture, Log, RedisBlocker } +import dev.profunktor.redis4cats.effect.JRFuture._ import dev.profunktor.redis4cats.effects._ -import io.lettuce.core.{GeoArgs, GeoRadiusStoreArgs, GeoWithin, ScanCursor, ScoredValue, ZAddArgs, ZStoreArgs, Limit => JLimit, Range => JRange, SetArgs => JSetArgs} +import dev.profunktor.redis4cats.transactions.TransactionDiscarded +import io.lettuce.core.{ + GeoArgs, + GeoRadiusStoreArgs, + GeoWithin, + ScanCursor, + ScoredValue, + ZAddArgs, + ZStoreArgs, + Limit => JLimit, + Range => JRange, + SetArgs => JSetArgs +} import io.lettuce.core.api.async.RedisAsyncCommands import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands +import io.lettuce.core.cluster.api.sync.{ RedisClusterCommands => RedisClusterSyncCommands } import java.util.concurrent.TimeUnit - import scala.concurrent.duration._ object Redis { private[redis4cats] def acquireAndRelease[F[_]: Concurrent: ContextShift: Log, K, V]( client: RedisClient, - codec: RedisCodec[K, V] + codec: RedisCodec[K, V], + blocker: Blocker ): (F[Redis[F, K, V]], Redis[F, K, V] => F[Unit]) = { val acquire = JRFuture - .fromConnectionFuture { + .fromConnectionFuture( F.delay(client.underlying.connectAsync(codec.underlying, client.uri.underlying)) - } - .map(c => new Redis(new RedisStatefulConnection(c))) + )(blocker) + .map(c => new Redis(new RedisStatefulConnection(c, blocker), blocker)) val release: Redis[F, K, V] => F[Unit] = c => F.info(s"Releasing Commands connection: ${client.uri.underlying}") *> c.conn.close @@ -52,13 +66,14 @@ object Redis { private[redis4cats] def acquireAndReleaseCluster[F[_]: Concurrent: ContextShift: Log, K, V]( client: RedisClusterClient, - codec: RedisCodec[K, V] + codec: RedisCodec[K, V], + blocker: Blocker ): (F[RedisCluster[F, K, V]], RedisCluster[F, K, V] => F[Unit]) = { val acquire = JRFuture - .fromCompletableFuture { + .fromCompletableFuture( F.delay(client.underlying.connectAsync[K, V](codec.underlying)) - } - .map(c => new RedisCluster(new RedisStatefulClusterConnection(c))) + )(blocker) + .map(c => new RedisCluster(new RedisStatefulClusterConnection(c, blocker), blocker)) val release: RedisCluster[F, K, V] => F[Unit] = c => F.info(s"Releasing cluster Commands connection: ${client.underlying}") *> c.conn.close @@ -69,14 +84,15 @@ object Redis { private[redis4cats] def acquireAndReleaseClusterByNode[F[_]: Concurrent: ContextShift: Log, K, V]( client: RedisClusterClient, codec: RedisCodec[K, V], - nodeId: NodeId + nodeId: NodeId, + blocker: Blocker ): (F[BaseRedis[F, K, V]], BaseRedis[F, K, V] => F[Unit]) = { val acquire = JRFuture - .fromCompletableFuture { + .fromCompletableFuture( F.delay(client.underlying.connectAsync[K, V](codec.underlying)) - } + )(blocker) .map { c => - new BaseRedis[F, K, V](new RedisStatefulClusterConnection(c), cluster = true) { + new BaseRedis[F, K, V](new RedisStatefulClusterConnection(c, blocker), cluster = true, blocker) { override def async: F[RedisClusterAsyncCommands[K, V]] = if (cluster) conn.byNode(nodeId).widen[RedisClusterAsyncCommands[K, V]] else conn.async.widen[RedisClusterAsyncCommands[K, V]] @@ -92,132 +108,155 @@ object Redis { def apply[F[_]: Concurrent: ContextShift: Log, K, V]( client: RedisClient, codec: RedisCodec[K, V] - ): Resource[F, RedisCommands[F, K, V]] = { - val (acquire, release) = acquireAndRelease(client, codec) - Resource.make(acquire)(release).widen - } + ): Resource[F, RedisCommands[F, K, V]] = + mkBlocker[F].flatMap { blocker => + val (acquire, release) = acquireAndRelease(client, codec, blocker) + Resource.make(acquire)(release).widen + } def cluster[F[_]: Concurrent: ContextShift: Log, K, V]( clusterClient: RedisClusterClient, codec: RedisCodec[K, V] - ): Resource[F, RedisCommands[F, K, V]] = { - val (acquire, release) = acquireAndReleaseCluster(clusterClient, codec) - Resource.make(acquire)(release).widen - } + ): Resource[F, RedisCommands[F, K, V]] = + mkBlocker[F].flatMap { blocker => + val (acquire, release) = acquireAndReleaseCluster(clusterClient, codec, blocker) + Resource.make(acquire)(release).widen + } def clusterByNode[F[_]: Concurrent: ContextShift: Log, K, V]( clusterClient: RedisClusterClient, codec: RedisCodec[K, V], nodeId: NodeId - ): Resource[F, RedisCommands[F, K, V]] = { - val (acquire, release) = acquireAndReleaseClusterByNode(clusterClient, codec, nodeId) - Resource.make(acquire)(release).widen - } + ): Resource[F, RedisCommands[F, K, V]] = + mkBlocker[F].flatMap { blocker => + val (acquire, release) = acquireAndReleaseClusterByNode(clusterClient, codec, nodeId, blocker) + Resource.make(acquire)(release).widen + } def masterReplica[F[_]: Concurrent: ContextShift: Log, K, V]( conn: RedisMasterReplica[K, V] - ): F[RedisCommands[F, K, V]] = - F.delay(new RedisStatefulConnection(conn.underlying)) - .map(new Redis[F, K, V](_)) + ): Resource[F, RedisCommands[F, K, V]] = + mkBlocker[F].flatMap { blocker => + Resource.liftF { + F.delay(new RedisStatefulConnection(conn.underlying, blocker)) + .map(c => new Redis[F, K, V](c, blocker)) + } + } } private[redis4cats] class BaseRedis[F[_]: Concurrent: ContextShift, K, V]( val conn: RedisConnection[F, K, V], - val cluster: Boolean + val cluster: Boolean, + blocker: Blocker ) extends RedisCommands[F, K, V] with RedisConversionOps { override def liftK[G[_]: Concurrent: ContextShift]: BaseRedis[G, K, V] = - new BaseRedis[G, K, V](conn.liftK[G], cluster) + new BaseRedis[G, K, V](conn.liftK[G], cluster, blocker) import dev.profunktor.redis4cats.JavaConversions._ + // To avoid passing it to every `futureLift` + implicit val redisBlocker = RedisBlocker(blocker) + def async: F[RedisClusterAsyncCommands[K, V]] = - if (cluster) conn.clusterAsync else conn.async.widen[RedisClusterAsyncCommands[K, V]] + if (cluster) conn.clusterAsync else conn.async.widen + + def sync: F[RedisClusterSyncCommands[K, V]] = + if (cluster) conn.clusterSync else conn.sync.widen /******************************* Keys API *************************************/ def del(key: K*): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.del(key: _*))) - }.void + async.flatMap(c => F.delay(c.del(key: _*))).futureLift.void def exists(key: K*): F[Boolean] = - JRFuture { - async.flatMap(c => F.delay(c.exists(key: _*))) - }.map(x => x == key.size.toLong) + async + .flatMap(c => F.delay(c.exists(key: _*))) + .futureLift + .map(x => x == key.size.toLong) def expire(key: K, expiresIn: FiniteDuration): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.expire(key, expiresIn.toSeconds))) - }.void + async.flatMap(c => F.delay(c.expire(key, expiresIn.toSeconds))).futureLift.void def ttl(key: K): F[Option[FiniteDuration]] = - JRFuture { - async.flatMap(c => F.delay(c.ttl(key))) - }.map { - case d if d == -2 || d == -1 => none[FiniteDuration] - case d => FiniteDuration(d, TimeUnit.SECONDS).some - } + async + .flatMap(c => F.delay(c.ttl(key))) + .futureLift + .map { + case d if d == -2 || d == -1 => none[FiniteDuration] + case d => FiniteDuration(d, TimeUnit.SECONDS).some + } def pttl(key: K): F[Option[FiniteDuration]] = - JRFuture { - async.flatMap(c => F.delay(c.pttl(key))) - }.map { - case d if d == -2 || d == -1 => none[FiniteDuration] - case d => FiniteDuration(d, TimeUnit.MILLISECONDS).some - } + async + .flatMap(c => F.delay(c.pttl(key))) + .futureLift + .map { + case d if d == -2 || d == -1 => none[FiniteDuration] + case d => FiniteDuration(d, TimeUnit.MILLISECONDS).some + } def scan: F[KeyScanCursor[K]] = - JRFuture { - async.flatMap(c => F.delay(c.scan())) - }.map(KeyScanCursor[K]) + async + .flatMap(c => F.delay(c.scan())) + .futureLift + .map(KeyScanCursor[K]) def scan(cursor: Long): F[KeyScanCursor[K]] = - JRFuture { - async.flatMap(c => F.delay(c.scan(ScanCursor.of(cursor.toString)))) - }.map(KeyScanCursor[K]) + async + .flatMap(c => F.delay(c.scan(ScanCursor.of(cursor.toString)))) + .futureLift + .map(KeyScanCursor[K]) /******************************* Transactions API **********************************/ - // When in a cluster, transactions should run against a single node, therefore we use `conn.async` instead of `conn.clusterAsync`. + // When in a cluster, transactions should run against a single node. def multi: F[Unit] = - JRFuture { - async.flatMap { + async + .flatMap { case c: RedisAsyncCommands[K, V] => F.delay(c.multi()) case _ => conn.async.flatMap(c => F.delay(c.multi())) } - }.void + .futureLift + .void def exec: F[Unit] = - JRFuture { - async.flatMap { + async + .flatMap { case c: RedisAsyncCommands[K, V] => F.delay(c.exec()) case _ => conn.async.flatMap(c => F.delay(c.exec())) } - }.void + .futureLift + .flatMap { + case res if res.wasDiscarded() => F.raiseError(TransactionDiscarded) + case _ => F.unit + } def discard: F[Unit] = - JRFuture { - async.flatMap { + async + .flatMap { case c: RedisAsyncCommands[K, V] => F.delay(c.discard()) case _ => conn.async.flatMap(c => F.delay(c.discard())) } - }.void + .futureLift + .void def watch(keys: K*): F[Unit] = - JRFuture { - async.flatMap { + async + .flatMap { case c: RedisAsyncCommands[K, V] => F.delay(c.watch(keys: _*)) case _ => conn.async.flatMap(c => F.delay(c.watch(keys: _*))) } - }.void + .futureLift + .void def unwatch: F[Unit] = - JRFuture { - async.flatMap { + async + .flatMap { case c: RedisAsyncCommands[K, V] => F.delay(c.unwatch()) case _ => conn.async.flatMap(c => F.delay(c.unwatch())) } - }.void + .futureLift + .void /******************************* AutoFlush API **********************************/ override def enableAutoFlush: F[Unit] = @@ -231,19 +270,16 @@ private[redis4cats] class BaseRedis[F[_]: Concurrent: ContextShift, K, V]( /******************************* Strings API **********************************/ override def append(key: K, value: V): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.append(key, value))) - }.void + async.flatMap(c => F.delay(c.append(key, value))).futureLift.void override def getSet(key: K, value: V): F[Option[V]] = - JRFuture { - async.flatMap(c => F.delay(c.getset(key, value))) - }.map(Option.apply) + async + .flatMap(c => F.delay(c.getset(key, value))) + .futureLift + .map(Option.apply) override def set(key: K, value: V): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.set(key, value))) - }.void + async.flatMap(c => F.delay(c.set(key, value))).futureLift.void override def set(key: K, value: V, setArgs: SetArgs): F[Boolean] = { val jSetArgs = new JSetArgs() @@ -258,15 +294,17 @@ private[redis4cats] class BaseRedis[F[_]: Concurrent: ContextShift, K, V]( case SetArg.Ttl.Ex(d) => jSetArgs.ex(d.toSeconds) } - JRFuture { - async.flatMap(c => F.delay(c.set(key, value, jSetArgs))) - }.map(_ == "OK") + async + .flatMap(c => F.delay(c.set(key, value, jSetArgs))) + .futureLift + .map(_ == "OK") } override def setNx(key: K, value: V): F[Boolean] = - JRFuture { - async.flatMap(c => F.delay(c.setnx(key, value))) - }.map(x => Boolean.box(x)) + async + .flatMap(c => F.delay(c.setnx(key, value))) + .futureLift + .map(x => Boolean.box(x)) override def setEx(key: K, value: V, expiresIn: FiniteDuration): F[Unit] = { val command = expiresIn.unit match { @@ -275,396 +313,398 @@ private[redis4cats] class BaseRedis[F[_]: Concurrent: ContextShift, K, V]( case _ => async.flatMap(c => F.delay(c.setex(key, expiresIn.toSeconds, value))) } - JRFuture(command).void + command.futureLift.void } override def setRange(key: K, value: V, offset: Long): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.setrange(key, offset, value))) - }.void + async.flatMap(c => F.delay(c.setrange(key, offset, value))).futureLift.void override def decr(key: K)(implicit N: Numeric[V]): F[Long] = - JRFuture { - async.flatMap(c => F.delay(c.decr(key))) - }.map(x => Long.box(x)) + async + .flatMap(c => F.delay(c.decr(key))) + .futureLift + .map(x => Long.box(x)) override def decrBy(key: K, amount: Long)(implicit N: Numeric[V]): F[Long] = - JRFuture { - async.flatMap(c => F.delay(c.incrby(key, amount))) - }.map(x => Long.box(x)) + async + .flatMap(c => F.delay(c.incrby(key, amount))) + .futureLift + .map(x => Long.box(x)) override def incr(key: K)(implicit N: Numeric[V]): F[Long] = - JRFuture { - async.flatMap(c => F.delay(c.incr(key))) - }.map(x => Long.box(x)) + async + .flatMap(c => F.delay(c.incr(key))) + .futureLift + .map(x => Long.box(x)) override def incrBy(key: K, amount: Long)(implicit N: Numeric[V]): F[Long] = - JRFuture { - async.flatMap(c => F.delay(c.incrby(key, amount))) - }.map(x => Long.box(x)) + async + .flatMap(c => F.delay(c.incrby(key, amount))) + .futureLift + .map(x => Long.box(x)) override def incrByFloat(key: K, amount: Double)(implicit N: Numeric[V]): F[Double] = - JRFuture { - async.flatMap(c => F.delay(c.incrbyfloat(key, amount))) - }.map(x => Double.box(x)) + async + .flatMap(c => F.delay(c.incrbyfloat(key, amount))) + .futureLift + .map(x => Double.box(x)) override def get(key: K): F[Option[V]] = - JRFuture { - async.flatMap(c => F.delay(c.get(key))) - }.map(Option.apply) + async + .flatMap(c => F.delay(c.get(key))) + .futureLift + .map(Option.apply) override def getBit(key: K, offset: Long): F[Option[Long]] = - JRFuture { - async.flatMap(c => F.delay(c.getbit(key, offset))) - }.map(x => Option(Long.unbox(x))) + async + .flatMap(c => F.delay(c.getbit(key, offset))) + .futureLift + .map(x => Option(Long.unbox(x))) override def getRange(key: K, start: Long, end: Long): F[Option[V]] = - JRFuture { - async.flatMap(c => F.delay(c.getrange(key, start, end))) - }.map(Option.apply) + async + .flatMap(c => F.delay(c.getrange(key, start, end))) + .futureLift + .map(Option.apply) override def strLen(key: K): F[Option[Long]] = - JRFuture { - async.flatMap(c => F.delay(c.strlen(key))) - }.map(x => Option(Long.unbox(x))) + async + .flatMap(c => F.delay(c.strlen(key))) + .futureLift + .map(x => Option(Long.unbox(x))) override def mGet(keys: Set[K]): F[Map[K, V]] = - JRFuture { - async.flatMap(c => F.delay(c.mget(keys.toSeq: _*))) - }.map(_.asScala.toList.collect { case kv if kv.hasValue => kv.getKey -> kv.getValue }.toMap) + async + .flatMap(c => F.delay(c.mget(keys.toSeq: _*))) + .futureLift + .map(_.asScala.toList.collect { case kv if kv.hasValue => kv.getKey -> kv.getValue }.toMap) override def mSet(keyValues: Map[K, V]): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.mset(keyValues.asJava))) - }.void + async.flatMap(c => F.delay(c.mset(keyValues.asJava))).futureLift.void override def mSetNx(keyValues: Map[K, V]): F[Boolean] = - JRFuture { - async.flatMap(c => F.delay(c.msetnx(keyValues.asJava))) - }.map(x => Boolean.box(x)) + async + .flatMap(c => F.delay(c.msetnx(keyValues.asJava))) + .futureLift + .map(x => Boolean.box(x)) override def bitCount(key: K): F[Long] = - JRFuture { - async.flatMap(c => F.delay(c.bitcount(key))) - }.map(x => Long.box(x)) + async + .flatMap(c => F.delay(c.bitcount(key))) + .futureLift + .map(x => Long.box(x)) override def bitCount(key: K, start: Long, end: Long): F[Long] = - JRFuture { - async.flatMap(c => F.delay(c.bitcount(key, start, end))) - }.map(x => Long.box(x)) + async + .flatMap(c => F.delay(c.bitcount(key, start, end))) + .futureLift + .map(x => Long.box(x)) override def bitPos(key: K, state: Boolean): F[Long] = - JRFuture { - async.flatMap(c => F.delay(c.bitpos(key, state))) - }.map(x => Long.box(x)) + async + .flatMap(c => F.delay(c.bitpos(key, state))) + .futureLift + .map(x => Long.box(x)) override def bitPos(key: K, state: Boolean, start: Long): F[Long] = - JRFuture { - async.flatMap(c => F.delay(c.bitpos(key, state, start))) - }.map(x => Long.box(x)) + async + .flatMap(c => F.delay(c.bitpos(key, state, start))) + .futureLift + .map(x => Long.box(x)) override def bitPos(key: K, state: Boolean, start: Long, end: Long): F[Long] = - JRFuture { - async.flatMap(c => F.delay(c.bitpos(key, state, start, end))) - }.map(x => Long.box(x)) + async + .flatMap(c => F.delay(c.bitpos(key, state, start, end))) + .futureLift + .map(x => Long.box(x)) override def bitOpAnd(destination: K, sources: K*): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.bitopAnd(destination, sources: _*))) - }.void + async.flatMap(c => F.delay(c.bitopAnd(destination, sources: _*))).futureLift.void override def bitOpNot(destination: K, source: K): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.bitopNot(destination, source))) - }.void + async.flatMap(c => F.delay(c.bitopNot(destination, source))).futureLift.void override def bitOpOr(destination: K, sources: K*): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.bitopOr(destination, sources: _*))) - }.void + async.flatMap(c => F.delay(c.bitopOr(destination, sources: _*))).futureLift.void override def bitOpXor(destination: K, sources: K*): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.bitopXor(destination, sources: _*))) - }.void + async.flatMap(c => F.delay(c.bitopXor(destination, sources: _*))).futureLift.void /******************************* Hashes API **********************************/ override def hDel(key: K, fields: K*): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.hdel(key, fields: _*))) - }.void + async.flatMap(c => F.delay(c.hdel(key, fields: _*))).futureLift.void override def hExists(key: K, field: K): F[Boolean] = - JRFuture { - async.flatMap(c => F.delay(c.hexists(key, field))) - }.map(x => Boolean.box(x)) + async + .flatMap(c => F.delay(c.hexists(key, field))) + .futureLift + .map(x => Boolean.box(x)) override def hGet(key: K, field: K): F[Option[V]] = - JRFuture { - async.flatMap(c => F.delay(c.hget(key, field))) - }.map(Option.apply) + async + .flatMap(c => F.delay(c.hget(key, field))) + .futureLift + .map(Option.apply) override def hGetAll(key: K): F[Map[K, V]] = - JRFuture { - async.flatMap(c => F.delay(c.hgetall(key))) - }.map(_.asScala.toMap) + async + .flatMap(c => F.delay(c.hgetall(key))) + .futureLift + .map(_.asScala.toMap) override def hmGet(key: K, fields: K*): F[Map[K, V]] = - JRFuture { - async.flatMap(c => F.delay(c.hmget(key, fields: _*))) - }.map(_.asScala.toList.map(kv => kv.getKey -> kv.getValue).toMap) + async + .flatMap(c => F.delay(c.hmget(key, fields: _*))) + .futureLift + .map(_.asScala.toList.map(kv => kv.getKey -> kv.getValue).toMap) override def hKeys(key: K): F[List[K]] = - JRFuture { - async.flatMap(c => F.delay(c.hkeys(key))) - }.map(_.asScala.toList) + async + .flatMap(c => F.delay(c.hkeys(key))) + .futureLift + .map(_.asScala.toList) override def hVals(key: K): F[List[V]] = - JRFuture { - async.flatMap(c => F.delay(c.hvals(key))) - }.map(_.asScala.toList) + async + .flatMap(c => F.delay(c.hvals(key))) + .futureLift + .map(_.asScala.toList) override def hStrLen(key: K, field: K): F[Option[Long]] = - JRFuture { - async.flatMap(c => F.delay(c.hstrlen(key, field))) - }.map(x => Option(Long.unbox(x))) + async + .flatMap(c => F.delay(c.hstrlen(key, field))) + .futureLift + .map(x => Option(Long.unbox(x))) override def hLen(key: K): F[Option[Long]] = - JRFuture { - async.flatMap(c => F.delay(c.hlen(key))) - }.map(x => Option(Long.unbox(x))) + async + .flatMap(c => F.delay(c.hlen(key))) + .futureLift + .map(x => Option(Long.unbox(x))) override def hSet(key: K, field: K, value: V): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.hset(key, field, value))) - }.void + async.flatMap(c => F.delay(c.hset(key, field, value))).futureLift.void override def hSetNx(key: K, field: K, value: V): F[Boolean] = - JRFuture { - async.flatMap(c => F.delay(c.hsetnx(key, field, value))) - }.map(x => Boolean.box(x)) + async + .flatMap(c => F.delay(c.hsetnx(key, field, value))) + .futureLift + .map(x => Boolean.box(x)) override def hmSet(key: K, fieldValues: Map[K, V]): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.hmset(key, fieldValues.asJava))) - }.void + async.flatMap(c => F.delay(c.hmset(key, fieldValues.asJava))).futureLift.void override def hIncrBy(key: K, field: K, amount: Long)(implicit N: Numeric[V]): F[Long] = - JRFuture { - async.flatMap(c => F.delay(c.hincrby(key, field, amount))) - }.map(x => Long.box(x)) + async + .flatMap(c => F.delay(c.hincrby(key, field, amount))) + .futureLift + .map(x => Long.box(x)) override def hIncrByFloat(key: K, field: K, amount: Double)(implicit N: Numeric[V]): F[Double] = - JRFuture { - async.flatMap(c => F.delay(c.hincrbyfloat(key, field, amount))) - }.map(x => Double.box(x)) + async + .flatMap(c => F.delay(c.hincrbyfloat(key, field, amount))) + .futureLift + .map(x => Double.box(x)) /******************************* Sets API **********************************/ override def sIsMember(key: K, value: V): F[Boolean] = - JRFuture { - async.flatMap(c => F.delay(c.sismember(key, value))) - }.map(x => Boolean.box(x)) + async + .flatMap(c => F.delay(c.sismember(key, value))) + .futureLift + .map(x => Boolean.box(x)) override def sAdd(key: K, values: V*): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.sadd(key, values: _*))) - }.void + async.flatMap(c => F.delay(c.sadd(key, values: _*))).futureLift.void override def sDiffStore(destination: K, keys: K*): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.sdiffstore(destination, keys: _*))) - }.void + async.flatMap(c => F.delay(c.sdiffstore(destination, keys: _*))).futureLift.void override def sInterStore(destination: K, keys: K*): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.sinterstore(destination, keys: _*))) - }.void + async.flatMap(c => F.delay(c.sinterstore(destination, keys: _*))).futureLift.void override def sMove(source: K, destination: K, value: V): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.smove(source, destination, value))) - }.void + async.flatMap(c => F.delay(c.smove(source, destination, value))).futureLift.void override def sPop(key: K): F[Option[V]] = - JRFuture { - async.flatMap(c => F.delay(c.spop(key))) - }.map(Option.apply) + async + .flatMap(c => F.delay(c.spop(key))) + .futureLift + .map(Option.apply) override def sPop(key: K, count: Long): F[Set[V]] = - JRFuture { - async.flatMap(c => F.delay(c.spop(key, count))) - }.map(_.asScala.toSet) + async + .flatMap(c => F.delay(c.spop(key, count))) + .futureLift + .map(_.asScala.toSet) override def sRem(key: K, values: V*): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.srem(key, values: _*))) - }.void + async.flatMap(c => F.delay(c.srem(key, values: _*))).futureLift.void override def sCard(key: K): F[Long] = - JRFuture { - async.flatMap(c => F.delay(c.scard(key))) - }.map(x => Long.box(x)) + async + .flatMap(c => F.delay(c.scard(key))) + .futureLift + .map(x => Long.box(x)) override def sDiff(keys: K*): F[Set[V]] = - JRFuture { - async.flatMap(c => F.delay(c.sdiff(keys: _*))) - }.map(_.asScala.toSet) + async + .flatMap(c => F.delay(c.sdiff(keys: _*))) + .futureLift + .map(_.asScala.toSet) override def sInter(keys: K*): F[Set[V]] = - JRFuture { - async.flatMap(c => F.delay(c.sinter(keys: _*))) - }.map(_.asScala.toSet) + async + .flatMap(c => F.delay(c.sinter(keys: _*))) + .futureLift + .map(_.asScala.toSet) override def sMembers(key: K): F[Set[V]] = - JRFuture { - async.flatMap(c => F.delay(c.smembers(key))) - }.map(_.asScala.toSet) + async + .flatMap(c => F.delay(c.smembers(key))) + .futureLift + .map(_.asScala.toSet) override def sRandMember(key: K): F[Option[V]] = - JRFuture { - async.flatMap(c => F.delay(c.srandmember(key))) - }.map(Option.apply) + async + .flatMap(c => F.delay(c.srandmember(key))) + .futureLift + .map(Option.apply) override def sRandMember(key: K, count: Long): F[List[V]] = - JRFuture { - async.flatMap(c => F.delay(c.srandmember(key, count))) - }.map(_.asScala.toList) + async + .flatMap(c => F.delay(c.srandmember(key, count))) + .futureLift + .map(_.asScala.toList) override def sUnion(keys: K*): F[Set[V]] = - JRFuture { - async.flatMap(c => F.delay(c.sunion(keys: _*))) - }.map(_.asScala.toSet) + async + .flatMap(c => F.delay(c.sunion(keys: _*))) + .futureLift + .map(_.asScala.toSet) override def sUnionStore(destination: K, keys: K*): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.sunionstore(destination, keys: _*))) - }.void + async.flatMap(c => F.delay(c.sunionstore(destination, keys: _*))).futureLift.void /******************************* Lists API **********************************/ override def lIndex(key: K, index: Long): F[Option[V]] = - JRFuture { - async.flatMap(c => F.delay(c.lindex(key, index))) - }.map(Option.apply) + async + .flatMap(c => F.delay(c.lindex(key, index))) + .futureLift + .map(Option.apply) override def lLen(key: K): F[Option[Long]] = - JRFuture { - async.flatMap(c => F.delay(c.llen(key))) - }.map(x => Option(Long.unbox(x))) + async + .flatMap(c => F.delay(c.llen(key))) + .futureLift + .map(x => Option(Long.unbox(x))) override def lRange(key: K, start: Long, stop: Long): F[List[V]] = - JRFuture { - async.flatMap(c => F.delay(c.lrange(key, start, stop))) - }.map(_.asScala.toList) + async + .flatMap(c => F.delay(c.lrange(key, start, stop))) + .futureLift + .map(_.asScala.toList) override def blPop(timeout: Duration, keys: K*): F[(K, V)] = - JRFuture { - async.flatMap(c => F.delay(c.blpop(timeout.toSecondsOrZero, keys: _*))) - }.map(kv => kv.getKey -> kv.getValue) + async + .flatMap(c => F.delay(c.blpop(timeout.toSecondsOrZero, keys: _*))) + .futureLift + .map(kv => kv.getKey -> kv.getValue) override def brPop(timeout: Duration, keys: K*): F[(K, V)] = - JRFuture { - async.flatMap(c => F.delay(c.brpop(timeout.toSecondsOrZero, keys: _*))) - }.map(kv => kv.getKey -> kv.getValue) + async + .flatMap(c => F.delay(c.brpop(timeout.toSecondsOrZero, keys: _*))) + .futureLift + .map(kv => kv.getKey -> kv.getValue) override def brPopLPush(timeout: Duration, source: K, destination: K): F[Option[V]] = - JRFuture { - async.flatMap(c => F.delay(c.brpoplpush(timeout.toSecondsOrZero, source, destination))) - }.map(Option.apply) + async + .flatMap(c => F.delay(c.brpoplpush(timeout.toSecondsOrZero, source, destination))) + .futureLift + .map(Option.apply) override def lPop(key: K): F[Option[V]] = - JRFuture { - async.flatMap(c => F.delay(c.lpop(key))) - }.map(Option.apply) + async + .flatMap(c => F.delay(c.lpop(key))) + .futureLift + .map(Option.apply) override def lPush(key: K, values: V*): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.lpush(key, values: _*))) - }.void + async.flatMap(c => F.delay(c.lpush(key, values: _*))).futureLift.void override def lPushX(key: K, values: V*): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.lpushx(key, values: _*))) - }.void + async.flatMap(c => F.delay(c.lpushx(key, values: _*))).futureLift.void override def rPop(key: K): F[Option[V]] = - JRFuture { - async.flatMap(c => F.delay(c.rpop(key))) - }.map(Option.apply) + async + .flatMap(c => F.delay(c.rpop(key))) + .futureLift + .map(Option.apply) override def rPopLPush(source: K, destination: K): F[Option[V]] = - JRFuture { - async.flatMap(c => F.delay(c.rpoplpush(source, destination))) - }.map(Option.apply) + async + .flatMap(c => F.delay(c.rpoplpush(source, destination))) + .futureLift + .map(Option.apply) override def rPush(key: K, values: V*): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.rpush(key, values: _*))) - }.void + async.flatMap(c => F.delay(c.rpush(key, values: _*))).futureLift.void override def rPushX(key: K, values: V*): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.rpushx(key, values: _*))) - }.void + async.flatMap(c => F.delay(c.rpushx(key, values: _*))).futureLift.void override def lInsertAfter(key: K, pivot: V, value: V): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.linsert(key, false, pivot, value))) - }.void + async.flatMap(c => F.delay(c.linsert(key, false, pivot, value))).futureLift.void override def lInsertBefore(key: K, pivot: V, value: V): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.linsert(key, true, pivot, value))) - }.void + async.flatMap(c => F.delay(c.linsert(key, true, pivot, value))).futureLift.void override def lRem(key: K, count: Long, value: V): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.lrem(key, count, value))) - }.void + async.flatMap(c => F.delay(c.lrem(key, count, value))).futureLift.void override def lSet(key: K, index: Long, value: V): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.lset(key, index, value))) - }.void + async.flatMap(c => F.delay(c.lset(key, index, value))).futureLift.void override def lTrim(key: K, start: Long, stop: Long): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.ltrim(key, start, stop))) - }.void + async.flatMap(c => F.delay(c.ltrim(key, start, stop))).futureLift.void /******************************* Geo API **********************************/ override def geoDist(key: K, from: V, to: V, unit: GeoArgs.Unit): F[Double] = - JRFuture { - async.flatMap(c => F.delay(c.geodist(key, from, to, unit))) - }.map(x => Double.box(x)) + async + .flatMap(c => F.delay(c.geodist(key, from, to, unit))) + .futureLift + .map(x => Double.box(x)) override def geoHash(key: K, values: V*): F[List[Option[String]]] = - JRFuture { - async.flatMap(c => F.delay(c.geohash(key, values: _*))) - }.map(_.asScala.toList.map(x => Option(x.getValue))) + async + .flatMap(c => F.delay(c.geohash(key, values: _*))) + .futureLift + .map(_.asScala.toList.map(x => Option(x.getValue))) override def geoPos(key: K, values: V*): F[List[GeoCoordinate]] = - JRFuture { - async.flatMap(c => F.delay(c.geopos(key, values: _*))) - }.map(_.asScala.toList.map(c => GeoCoordinate(c.getX.doubleValue(), c.getY.doubleValue()))) + async + .flatMap(c => F.delay(c.geopos(key, values: _*))) + .futureLift + .map(_.asScala.toList.map(c => GeoCoordinate(c.getX.doubleValue(), c.getY.doubleValue()))) override def geoRadius(key: K, geoRadius: GeoRadius, unit: GeoArgs.Unit): F[Set[V]] = - JRFuture { - async.flatMap(c => - F.delay(c.georadius(key, geoRadius.lon.value, geoRadius.lat.value, geoRadius.dist.value, unit)) - ) - }.map(_.asScala.toSet) + async + .flatMap(c => F.delay(c.georadius(key, geoRadius.lon.value, geoRadius.lat.value, geoRadius.dist.value, unit))) + .futureLift + .map(_.asScala.toSet) override def geoRadius(key: K, geoRadius: GeoRadius, unit: GeoArgs.Unit, args: GeoArgs): F[List[GeoRadiusResult[V]]] = - JRFuture { - async.flatMap(c => + async + .flatMap(c => F.delay(c.georadius(key, geoRadius.lon.value, geoRadius.lat.value, geoRadius.dist.value, unit, args)) ) - }.map(_.asScala.toList.map(_.asGeoRadiusResult)) + .futureLift + .map(_.asScala.toList.map(_.asGeoRadiusResult)) override def geoRadiusByMember(key: K, value: V, dist: Distance, unit: GeoArgs.Unit): F[Set[V]] = - JRFuture { - async.flatMap(c => F.delay(c.georadiusbymember(key, value, dist.value, unit))) - }.map(_.asScala.toSet) + async + .flatMap(c => F.delay(c.georadiusbymember(key, value, dist.value, unit))) + .futureLift + .map(_.asScala.toSet) override def geoRadiusByMember( key: K, @@ -673,49 +713,49 @@ private[redis4cats] class BaseRedis[F[_]: Concurrent: ContextShift, K, V]( unit: GeoArgs.Unit, args: GeoArgs ): F[List[GeoRadiusResult[V]]] = - JRFuture { - async.flatMap(c => F.delay(c.georadiusbymember(key, value, dist.value, unit, args))) - }.map(_.asScala.toList.map(_.asGeoRadiusResult)) - - override def geoAdd(key: K, geoValues: GeoLocation[V]*): F[Unit] = - JRFuture { - val triplets = geoValues.flatMap(g => Seq[Any](g.lon.value, g.lat.value, g.value)).asInstanceOf[Seq[AnyRef]] - async.flatMap(c => F.delay(c.geoadd(key, triplets: _*))) - }.void + async + .flatMap(c => F.delay(c.georadiusbymember(key, value, dist.value, unit, args))) + .futureLift + .map(_.asScala.toList.map(_.asGeoRadiusResult)) + + override def geoAdd(key: K, geoValues: GeoLocation[V]*): F[Unit] = { + val triplets = geoValues.flatMap(g => Seq[Any](g.lon.value, g.lat.value, g.value)).asInstanceOf[Seq[AnyRef]] + async.flatMap(c => F.delay(c.geoadd(key, triplets: _*))).futureLift.void + } override def geoRadius(key: K, geoRadius: GeoRadius, unit: GeoArgs.Unit, storage: GeoRadiusKeyStorage[K]): F[Unit] = - JRFuture { - conn.async - .flatMap { c => - F.delay { - c.georadius( - key, - geoRadius.lon.value, - geoRadius.lat.value, - geoRadius.dist.value, - unit, - storage.asGeoRadiusStoreArgs - ) - } + conn.async + .flatMap { c => + F.delay { + c.georadius( + key, + geoRadius.lon.value, + geoRadius.lat.value, + geoRadius.dist.value, + unit, + storage.asGeoRadiusStoreArgs + ) } - }.void + } + .futureLift + .void override def geoRadius(key: K, geoRadius: GeoRadius, unit: GeoArgs.Unit, storage: GeoRadiusDistStorage[K]): F[Unit] = - JRFuture { - conn.async - .flatMap { c => - F.delay { - c.georadius( - key, - geoRadius.lon.value, - geoRadius.lat.value, - geoRadius.dist.value, - unit, - storage.asGeoRadiusStoreArgs - ) - } + conn.async + .flatMap { c => + F.delay { + c.georadius( + key, + geoRadius.lon.value, + geoRadius.lat.value, + geoRadius.dist.value, + unit, + storage.asGeoRadiusStoreArgs + ) } - }.void + } + .futureLift + .void override def geoRadiusByMember( key: K, @@ -724,9 +764,10 @@ private[redis4cats] class BaseRedis[F[_]: Concurrent: ContextShift, K, V]( unit: GeoArgs.Unit, storage: GeoRadiusKeyStorage[K] ): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.georadiusbymember(key, value, dist.value, unit, storage.asGeoRadiusStoreArgs))) - }.void + async + .flatMap(c => F.delay(c.georadiusbymember(key, value, dist.value, unit, storage.asGeoRadiusStoreArgs))) + .futureLift + .void override def geoRadiusByMember( key: K, @@ -735,249 +776,248 @@ private[redis4cats] class BaseRedis[F[_]: Concurrent: ContextShift, K, V]( unit: GeoArgs.Unit, storage: GeoRadiusDistStorage[K] ): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.georadiusbymember(key, value, dist.value, unit, storage.asGeoRadiusStoreArgs))) - }.void + async + .flatMap(c => F.delay(c.georadiusbymember(key, value, dist.value, unit, storage.asGeoRadiusStoreArgs))) + .futureLift + .void /******************************* Sorted Sets API **********************************/ - override def zAdd(key: K, args: Option[ZAddArgs], values: ScoreWithValue[V]*): F[Unit] = - JRFuture { - args match { - case Some(x) => - async.flatMap(c => F.delay(c.zadd(key, x, values.map(s => ScoredValue.just(s.score.value, s.value)): _*))) - case None => - async.flatMap(c => F.delay(c.zadd(key, values.map(s => ScoredValue.just(s.score.value, s.value)): _*))) - } - }.void + override def zAdd(key: K, args: Option[ZAddArgs], values: ScoreWithValue[V]*): F[Unit] = { + val res = args match { + case Some(x) => + async.flatMap(c => F.delay(c.zadd(key, x, values.map(s => ScoredValue.just(s.score.value, s.value)): _*))) + case None => + async.flatMap(c => F.delay(c.zadd(key, values.map(s => ScoredValue.just(s.score.value, s.value)): _*))) + } + res.futureLift.void + } - override def zAddIncr(key: K, args: Option[ZAddArgs], member: ScoreWithValue[V]): F[Unit] = - JRFuture { - args match { - case Some(x) => async.flatMap(c => F.delay(c.zaddincr(key, x, member.score.value, member.value))) - case None => async.flatMap(c => F.delay(c.zaddincr(key, member.score.value, member.value))) - } - }.void + override def zAddIncr(key: K, args: Option[ZAddArgs], member: ScoreWithValue[V]): F[Unit] = { + val res = args match { + case Some(x) => async.flatMap(c => F.delay(c.zaddincr(key, x, member.score.value, member.value))) + case None => async.flatMap(c => F.delay(c.zaddincr(key, member.score.value, member.value))) + } + res.futureLift.void + } override def zIncrBy(key: K, member: V, amount: Double): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.zincrby(key, amount, member))) - }.void - - override def zInterStore(destination: K, args: Option[ZStoreArgs], keys: K*): F[Unit] = - JRFuture { - args match { - case Some(x) => async.flatMap(c => F.delay(c.zinterstore(destination, x, keys: _*))) - case None => async.flatMap(c => F.delay(c.zinterstore(destination, keys: _*))) - } - }.void + async.flatMap(c => F.delay(c.zincrby(key, amount, member))).futureLift.void + + override def zInterStore(destination: K, args: Option[ZStoreArgs], keys: K*): F[Unit] = { + val res = args match { + case Some(x) => async.flatMap(c => F.delay(c.zinterstore(destination, x, keys: _*))) + case None => async.flatMap(c => F.delay(c.zinterstore(destination, keys: _*))) + } + res.futureLift.void + } override def zRem(key: K, values: V*): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.zrem(key, values: _*))) - }.void + async.flatMap(c => F.delay(c.zrem(key, values: _*))).futureLift.void override def zRemRangeByLex(key: K, range: ZRange[V]): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.zremrangebylex(key, JRange.create[V](range.start, range.end)))) - }.void + async.flatMap(c => F.delay(c.zremrangebylex(key, JRange.create[V](range.start, range.end)))).futureLift.void override def zRemRangeByRank(key: K, start: Long, stop: Long): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.zremrangebyrank(key, start, stop))) - }.void + async.flatMap(c => F.delay(c.zremrangebyrank(key, start, stop))).futureLift.void override def zRemRangeByScore(key: K, range: ZRange[V])(implicit ev: Numeric[V]): F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.zremrangebyscore(key, range.asJavaRange))) - }.void - - override def zUnionStore(destination: K, args: Option[ZStoreArgs], keys: K*): F[Unit] = - JRFuture { - args match { - case Some(x) => async.flatMap(c => F.delay(c.zunionstore(destination, x, keys: _*))) - case None => async.flatMap(c => F.delay(c.zunionstore(destination, keys: _*))) - } - }.void + async.flatMap(c => F.delay(c.zremrangebyscore(key, range.asJavaRange))).futureLift.void + + override def zUnionStore(destination: K, args: Option[ZStoreArgs], keys: K*): F[Unit] = { + val res = args match { + case Some(x) => async.flatMap(c => F.delay(c.zunionstore(destination, x, keys: _*))) + case None => async.flatMap(c => F.delay(c.zunionstore(destination, keys: _*))) + } + res.futureLift.void + } override def zCard(key: K): F[Option[Long]] = - JRFuture { - async.flatMap(c => F.delay(c.zcard(key))) - }.map(x => Option(Long.unbox(x))) + async + .flatMap(c => F.delay(c.zcard(key))) + .futureLift + .map(x => Option(Long.unbox(x))) override def zCount(key: K, range: ZRange[V])(implicit ev: Numeric[V]): F[Option[Long]] = - JRFuture { - async.flatMap(c => F.delay(c.zcount(key, range.asJavaRange))) - }.map(x => Option(Long.unbox(x))) + async + .flatMap(c => F.delay(c.zcount(key, range.asJavaRange))) + .futureLift + .map(x => Option(Long.unbox(x))) override def zLexCount(key: K, range: ZRange[V]): F[Option[Long]] = - JRFuture { - async.flatMap(c => F.delay(c.zlexcount(key, JRange.create[V](range.start, range.end)))) - }.map(x => Option(Long.unbox(x))) + async + .flatMap(c => F.delay(c.zlexcount(key, JRange.create[V](range.start, range.end)))) + .futureLift + .map(x => Option(Long.unbox(x))) override def zRange(key: K, start: Long, stop: Long): F[List[V]] = - JRFuture { - async.flatMap(c => F.delay(c.zrange(key, start, stop))) - }.map(_.asScala.toList) - - override def zRangeByLex(key: K, range: ZRange[V], limit: Option[RangeLimit]): F[List[V]] = - JRFuture { - limit match { - case Some(x) => - async.flatMap(c => - F.delay(c.zrangebylex(key, JRange.create[V](range.start, range.end), JLimit.create(x.offset, x.count))) - ) - case None => async.flatMap(c => F.delay(c.zrangebylex(key, JRange.create[V](range.start, range.end)))) - } - }.map(_.asScala.toList) - - override def zRangeByScore[T: Numeric](key: K, range: ZRange[T], limit: Option[RangeLimit]): F[List[V]] = - JRFuture { - limit match { - case Some(x) => - async.flatMap(c => F.delay(c.zrangebyscore(key, range.asJavaRange, JLimit.create(x.offset, x.count)))) - case None => async.flatMap(c => F.delay(c.zrangebyscore(key, range.asJavaRange))) - } - }.map(_.asScala.toList) + async + .flatMap(c => F.delay(c.zrange(key, start, stop))) + .futureLift + .map(_.asScala.toList) + + override def zRangeByLex(key: K, range: ZRange[V], limit: Option[RangeLimit]): F[List[V]] = { + val res = limit match { + case Some(x) => + async.flatMap(c => + F.delay(c.zrangebylex(key, JRange.create[V](range.start, range.end), JLimit.create(x.offset, x.count))) + ) + case None => async.flatMap(c => F.delay(c.zrangebylex(key, JRange.create[V](range.start, range.end)))) + } + res.futureLift.map(_.asScala.toList) + } + + override def zRangeByScore[T: Numeric](key: K, range: ZRange[T], limit: Option[RangeLimit]): F[List[V]] = { + val res = limit match { + case Some(x) => + async.flatMap(c => F.delay(c.zrangebyscore(key, range.asJavaRange, JLimit.create(x.offset, x.count)))) + case None => async.flatMap(c => F.delay(c.zrangebyscore(key, range.asJavaRange))) + } + res.futureLift.map(_.asScala.toList) + } override def zRangeByScoreWithScores[T: Numeric]( key: K, range: ZRange[T], limit: Option[RangeLimit] - ): F[List[ScoreWithValue[V]]] = - JRFuture { - limit match { - case Some(x) => - async.flatMap(c => - F.delay(c.zrangebyscoreWithScores(key, range.asJavaRange, JLimit.create(x.offset, x.count))) - ) - case None => async.flatMap(c => F.delay(c.zrangebyscoreWithScores(key, range.asJavaRange))) - } - }.map(_.asScala.toList.map(_.asScoreWithValues)) + ): F[List[ScoreWithValue[V]]] = { + val res = limit match { + case Some(x) => + async.flatMap(c => F.delay(c.zrangebyscoreWithScores(key, range.asJavaRange, JLimit.create(x.offset, x.count)))) + case None => async.flatMap(c => F.delay(c.zrangebyscoreWithScores(key, range.asJavaRange))) + } + res.futureLift.map(_.asScala.toList.map(_.asScoreWithValues)) + } override def zRangeWithScores(key: K, start: Long, stop: Long): F[List[ScoreWithValue[V]]] = - JRFuture { - async.flatMap(c => F.delay(c.zrangeWithScores(key, start, stop))) - }.map(_.asScala.toList.map(_.asScoreWithValues)) + async + .flatMap(c => F.delay(c.zrangeWithScores(key, start, stop))) + .futureLift + .map(_.asScala.toList.map(_.asScoreWithValues)) override def zRank(key: K, value: V): F[Option[Long]] = - JRFuture { - async.flatMap(c => F.delay(c.zrank(key, value))) - }.map(x => Option(Long.unbox(x))) + async + .flatMap(c => F.delay(c.zrank(key, value))) + .futureLift + .map(x => Option(Long.unbox(x))) override def zRevRange(key: K, start: Long, stop: Long): F[List[V]] = - JRFuture { - async.flatMap(c => F.delay(c.zrevrange(key, start, stop))) - }.map(_.asScala.toList) - - override def zRevRangeByLex(key: K, range: ZRange[V], limit: Option[RangeLimit]): F[List[V]] = - JRFuture { - limit match { - case Some(x) => - async.flatMap(c => - F.delay(c.zrevrangebylex(key, JRange.create[V](range.start, range.end), JLimit.create(x.offset, x.count))) - ) - case None => async.flatMap(c => F.delay(c.zrevrangebylex(key, JRange.create[V](range.start, range.end)))) - } - }.map(_.asScala.toList) - - override def zRevRangeByScore[T: Numeric](key: K, range: ZRange[T], limit: Option[RangeLimit]): F[List[V]] = - JRFuture { - limit match { - case Some(x) => - async.flatMap(c => F.delay(c.zrevrangebyscore(key, range.asJavaRange, JLimit.create(x.offset, x.count)))) - case None => async.flatMap(c => F.delay(c.zrevrangebyscore(key, range.asJavaRange))) - } - }.map(_.asScala.toList) + async + .flatMap(c => F.delay(c.zrevrange(key, start, stop))) + .futureLift + .map(_.asScala.toList) + + override def zRevRangeByLex(key: K, range: ZRange[V], limit: Option[RangeLimit]): F[List[V]] = { + val res = limit match { + case Some(x) => + async.flatMap(c => + F.delay(c.zrevrangebylex(key, JRange.create[V](range.start, range.end), JLimit.create(x.offset, x.count))) + ) + case None => async.flatMap(c => F.delay(c.zrevrangebylex(key, JRange.create[V](range.start, range.end)))) + } + res.futureLift.map(_.asScala.toList) + } + + override def zRevRangeByScore[T: Numeric](key: K, range: ZRange[T], limit: Option[RangeLimit]): F[List[V]] = { + val res = limit match { + case Some(x) => + async.flatMap(c => F.delay(c.zrevrangebyscore(key, range.asJavaRange, JLimit.create(x.offset, x.count)))) + case None => async.flatMap(c => F.delay(c.zrevrangebyscore(key, range.asJavaRange))) + } + res.futureLift.map(_.asScala.toList) + } override def zRevRangeByScoreWithScores[T: Numeric]( key: K, range: ZRange[T], limit: Option[RangeLimit] - ): F[List[ScoreWithValue[V]]] = - JRFuture { - limit match { - case Some(x) => - async.flatMap(c => - F.delay(c.zrevrangebyscoreWithScores(key, range.asJavaRange, JLimit.create(x.offset, x.count))) - ) - case None => async.flatMap(c => F.delay(c.zrangebyscoreWithScores(key, range.asJavaRange))) - } - }.map(_.asScala.toList.map(_.asScoreWithValues)) + ): F[List[ScoreWithValue[V]]] = { + val res = limit match { + case Some(x) => + async.flatMap(c => + F.delay(c.zrevrangebyscoreWithScores(key, range.asJavaRange, JLimit.create(x.offset, x.count))) + ) + case None => async.flatMap(c => F.delay(c.zrangebyscoreWithScores(key, range.asJavaRange))) + } + res.futureLift.map(_.asScala.toList.map(_.asScoreWithValues)) + } override def zRevRangeWithScores(key: K, start: Long, stop: Long): F[List[ScoreWithValue[V]]] = - JRFuture { - async.flatMap(c => F.delay(c.zrevrangeWithScores(key, start, stop))) - }.map(_.asScala.toList.map(_.asScoreWithValues)) + async + .flatMap(c => F.delay(c.zrevrangeWithScores(key, start, stop))) + .futureLift + .map(_.asScala.toList.map(_.asScoreWithValues)) override def zRevRank(key: K, value: V): F[Option[Long]] = - JRFuture { - async.flatMap(c => F.delay(c.zrevrank(key, value))) - }.map(x => Option(Long.unbox(x))) + async + .flatMap(c => F.delay(c.zrevrank(key, value))) + .futureLift + .map(x => Option(Long.unbox(x))) override def zScore(key: K, value: V): F[Option[Double]] = - JRFuture { - async.flatMap(c => F.delay(c.zscore(key, value))) - }.map(x => Option(Double.unbox(x))) + async + .flatMap(c => F.delay(c.zscore(key, value))) + .futureLift + .map(x => Option(Double.unbox(x))) /******************************* Connection API **********************************/ override val ping: F[String] = - JRFuture { - async.flatMap(c => F.delay(c.ping())) - } + async.flatMap(c => F.delay(c.ping())).futureLift /******************************* Server API **********************************/ override val flushAll: F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.flushall())) - }.void + async.flatMap(c => F.delay(c.flushall())).futureLift.void override val flushAllAsync: F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.flushallAsync())) - }.void + async.flatMap(c => F.delay(c.flushallAsync())).futureLift.void override def keys(key: K): F[List[K]] = - JRFuture { - async.flatMap(c => F.delay(c.keys(key))) - }.map(_.asScala.toList) + async + .flatMap(c => F.delay(c.keys(key))) + .futureLift + .map(_.asScala.toList) override def info: F[Map[String, String]] = - JRFuture { - async.flatMap(c => F.delay(c.info)) - }.flatMap(info => - F.delay( - info - .split("\\r?\\n") - .toList - .map(_.split(":", 2).toList) - .collect { case k :: v :: Nil => (k, v) } - .toMap + async + .flatMap(c => F.delay(c.info)) + .futureLift + .flatMap(info => + F.delay( + info + .split("\\r?\\n") + .toList + .map(_.split(":", 2).toList) + .collect { case k :: v :: Nil => (k, v) } + .toMap + ) ) - ) override def dbsize: F[Long] = - JRFuture { - async.flatMap(c => F.delay(c.dbsize)) - }.map(Long.unbox) + async + .flatMap(c => F.delay(c.dbsize)) + .futureLift + .map(Long.unbox) override def lastSave: F[Instant] = - JRFuture { - async.flatMap(c => F.delay(c.lastsave)) - }.map(_.toInstant) + async + .flatMap(c => F.delay(c.lastsave)) + .futureLift + .map(_.toInstant) override def slowLogLen: F[Long] = - JRFuture { - async.flatMap(c => F.delay(c.slowlogLen)) - }.map(Long.unbox) + async + .flatMap(c => F.delay(c.slowlogLen)) + .futureLift + .map(Long.unbox) override def eval(script: String, output: ScriptOutputType[V]): F[output.R] = - JRFuture { - async.flatMap(c => F.delay(c.eval[output.Underlying](script, output.outputType))) - }.map(r => output.convert(r)) + async + .flatMap(c => F.delay(c.eval[output.Underlying](script, output.outputType))) + .futureLift + .map(r => output.convert(r)) override def eval(script: String, output: ScriptOutputType[V], keys: List[K]): F[output.R] = - JRFuture { - async.flatMap(c => + async + .flatMap(c => F.delay( c.eval[output.Underlying]( script, @@ -988,11 +1028,12 @@ private[redis4cats] class BaseRedis[F[_]: Concurrent: ContextShift, K, V]( ) ) ) - }.map(output.convert(_)) + .futureLift + .map(output.convert(_)) override def eval(script: String, output: ScriptOutputType[V], keys: List[K], values: List[V]): F[output.R] = - JRFuture { - async.flatMap(c => + async + .flatMap(c => F.delay( c.eval[output.Underlying]( script, @@ -1003,16 +1044,18 @@ private[redis4cats] class BaseRedis[F[_]: Concurrent: ContextShift, K, V]( ) ) ) - }.map(output.convert(_)) + .futureLift + .map(output.convert(_)) override def evalSha(script: String, output: ScriptOutputType[V]): F[output.R] = - JRFuture { - async.flatMap(c => F.delay(c.evalsha[output.Underlying](script, output.outputType))) - }.map(output.convert(_)) + async + .flatMap(c => F.delay(c.evalsha[output.Underlying](script, output.outputType))) + .futureLift + .map(output.convert(_)) override def evalSha(script: String, output: ScriptOutputType[V], keys: List[K]): F[output.R] = - JRFuture { - async.flatMap(c => + async + .flatMap(c => F.delay( c.evalsha[output.Underlying]( script, @@ -1022,11 +1065,12 @@ private[redis4cats] class BaseRedis[F[_]: Concurrent: ContextShift, K, V]( ) ) ) - }.map(output.convert(_)) + .futureLift + .map(output.convert(_)) override def evalSha(script: String, output: ScriptOutputType[V], keys: List[K], values: List[V]): F[output.R] = - JRFuture { - async.flatMap(c => + async + .flatMap(c => F.delay( c.evalsha[output.Underlying]( script, @@ -1037,22 +1081,20 @@ private[redis4cats] class BaseRedis[F[_]: Concurrent: ContextShift, K, V]( ) ) ) - }.map(output.convert(_)) + .futureLift + .map(output.convert(_)) override def scriptLoad(script: V): F[String] = - JRFuture { - async.flatMap(c => F.delay(c.scriptLoad(script))) - } + async.flatMap(c => F.delay(c.scriptLoad(script))).futureLift override def scriptExists(digests: String*): F[List[Boolean]] = - JRFuture { - async.flatMap(c => F.delay(c.scriptExists(digests: _*))) - }.map(_.asScala.map(Boolean.unbox(_)).toList) + async + .flatMap(c => F.delay(c.scriptExists(digests: _*))) + .futureLift + .map(_.asScala.map(Boolean.unbox(_)).toList) override def scriptFlush: F[Unit] = - JRFuture { - async.flatMap(c => F.delay(c.scriptFlush())) - }.void + async.flatMap(c => F.delay(c.scriptFlush())).futureLift.void } private[redis4cats] trait RedisConversionOps { @@ -1115,9 +1157,11 @@ private[redis4cats] trait RedisConversionOps { } private[redis4cats] class Redis[F[_]: Concurrent: ContextShift, K, V]( - connection: RedisStatefulConnection[F, K, V] -) extends BaseRedis[F, K, V](connection, cluster = false) + connection: RedisStatefulConnection[F, K, V], + blocker: Blocker +) extends BaseRedis[F, K, V](connection, cluster = false, blocker) private[redis4cats] class RedisCluster[F[_]: Concurrent: ContextShift, K, V]( - connection: RedisStatefulClusterConnection[F, K, V] -) extends BaseRedis[F, K, V](connection, cluster = true) + connection: RedisStatefulClusterConnection[F, K, V], + blocker: Blocker +) extends BaseRedis[F, K, V](connection, cluster = true, blocker) diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala index 8476c4ea..f6b258cc 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala @@ -17,15 +17,22 @@ package dev.profunktor.redis4cats import cats.effect._ -import cats.effect.concurrent.Ref +import cats.effect.concurrent._ import cats.effect.implicits._ import cats.implicits._ import dev.profunktor.redis4cats.algebra._ import dev.profunktor.redis4cats.effect.Log +import dev.profunktor.redis4cats.hlist._ +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace object transactions { - case class RedisTransaction[F[_]: Concurrent: Log, K, V]( + sealed trait TransactionError extends NoStackTrace + case object TransactionAborted extends TransactionError + case object TransactionDiscarded extends TransactionError + + case class RedisTransaction[F[_]: Concurrent: Log: Timer, K, V]( cmd: RedisCommands[F, K, V] ) { @@ -39,25 +46,54 @@ object transactions { * It should not be used to run other computations, only Redis commands. Fail to do so * may end in unexpected results such as a dead lock. */ - def run(commands: F[Any]*): F[Unit] = - Ref.of[F, List[Fiber[F, Any]]](List.empty).flatMap { fibers => - val tx = - Resource.makeCase(cmd.multi) { - case (_, ExitCase.Completed) => - cmd.exec *> F.info("Transaction completed") - case (_, ExitCase.Error(e)) => - cmd.discard *> F.error(s"Transaction failed: ${e.getMessage}") - case (_, ExitCase.Canceled) => - cmd.discard *> F.error("Transaction canceled") + def exec[T <: HList, R <: HList](commands: T)(implicit w: Witness.Aux[T, R]): F[R] = + Deferred[F, Either[Throwable, w.R]].flatMap { promise => + // Forks every command in order + def runner[H <: HList, G <: HList](ys: H, res: G): F[Any] = + ys match { + case HNil => F.pure(res) + case HCons((h: F[_] @unchecked), t) => h.start.flatMap(fb => runner(t, fb :: res)) + } + + // Joins or cancel fibers correspondent to previous executed commands + def joinOrCancel[H <: HList, G <: HList](ys: H, res: G)(isJoin: Boolean): F[Any] = + ys match { + case HNil => F.pure(res) + case HCons((h: Fiber[F, Any] @unchecked), t) if isJoin => + h.join.flatMap(x => joinOrCancel(t, x :: res)(isJoin)) + case HCons((h: Fiber[F, Any] @unchecked), t) => + h.cancel.flatMap(x => joinOrCancel(t, x :: res)(isJoin)) + case HCons(h, t) => + F.error(s"Unexpected result: ${h.toString}") >> joinOrCancel(t, res)(isJoin) } - val cancelFibers = - fibers.get.flatMap(_.traverse(_.cancel).void) + def cancelFibers(fibs: HList, err: Throwable = TransactionAborted): F[Unit] = + joinOrCancel(fibs, HNil)(false).void >> promise.complete(err.asLeft) - F.info("Transaction started") *> - tx.use(_ => commands.toList.traverse(_.start).flatMap(fibers.set)) - .guarantee(cancelFibers) + val tx = + Resource.makeCase(cmd.multi >> runner(commands, HNil)) { + case ((fibs: HList), ExitCase.Completed) => + for { + _ <- F.info("Transaction completed") + _ <- cmd.exec.handleErrorWith(e => cancelFibers(fibs, e) >> F.raiseError(e)) + tr <- joinOrCancel(fibs, HNil)(true) + // Casting here is fine since we have a `Witness` that proves this true + _ <- promise.complete(tr.asInstanceOf[w.R].asRight) + } yield () + case ((fibs: HList), ExitCase.Error(e)) => + F.error(s"Transaction failed: ${e.getMessage}") >> + cmd.discard.guarantee(cancelFibers(fibs)) + case ((fibs: HList), ExitCase.Canceled) => + F.error("Transaction canceled") >> + cmd.discard.guarantee(cancelFibers(fibs)) + case _ => + F.error("Kernel panic: the impossible happened!") + } + + F.info("Transaction started") >> + (tx.use(_ => F.unit) >> promise.get.rethrow).timeout(3.seconds) } + } } diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisClusterTransactionsDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisClusterTransactionsDemo.scala index c7d8b9cb..ccd541f2 100644 --- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisClusterTransactionsDemo.scala +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisClusterTransactionsDemo.scala @@ -53,10 +53,13 @@ object RedisClusterTransactionsDemo extends LoggerIOApp { } yield nodeCmd // Transactions are only supported on a single node - val notAllowed = - cmd.multi.bracket(_ => cmd.set(key1, "nope") *> cmd.exec)(_ => cmd.discard).handleErrorWith { - case e: OperationNotSupported => putStrLn(e) - } + val notAllowed: IO[Unit] = + cmd.multi + .bracket(_ => cmd.set(key1, "nope") >> cmd.exec.void)(_ => cmd.discard) + .handleErrorWith { + case e: OperationNotSupported => putStrLn(e) + } + .void notAllowed *> // Transaction runs in a single shard, where "key1" is stored @@ -65,7 +68,7 @@ object RedisClusterTransactionsDemo extends LoggerIOApp { val getter = cmd.get(key1).flatTap(showResult(key1)) - val tx1 = tx.run(cmd.set(key1, "foo")) + val tx1 = putStrLn(tx) //.run(cmd.set(key1, "foo")) getter *> tx1 *> getter.void } diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisMasterReplicaStringsDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisMasterReplicaStringsDemo.scala index 8f5d10f9..797848f3 100644 --- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisMasterReplicaStringsDemo.scala +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisMasterReplicaStringsDemo.scala @@ -17,9 +17,9 @@ package dev.profunktor.redis4cats import cats.effect.{ IO, Resource } +import dev.profunktor.redis4cats.algebra.RedisCommands import dev.profunktor.redis4cats.connection._ import dev.profunktor.redis4cats.domain.ReadFrom -import dev.profunktor.redis4cats.connection.RedisMasterReplica import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.interpreter.Redis @@ -33,28 +33,28 @@ object RedisMasterReplicaStringsDemo extends LoggerIOApp { val showResult: Option[String] => IO[Unit] = _.fold(putStrLn(s"Not found key: $usernameKey"))(s => putStrLn(s)) - val connection: Resource[IO, RedisMasterReplica[String, String]] = - Resource.liftF(RedisURI.make[IO](redisURI)).flatMap { uri => - RedisMasterReplica[IO, String, String](stringCodec, uri)(Some(ReadFrom.MasterPreferred)) - } + val connection: Resource[IO, RedisCommands[IO, String, String]] = + for { + uri <- Resource.liftF(RedisURI.make[IO](redisURI)) + conn <- RedisMasterReplica[IO, String, String](stringCodec, uri)(Some(ReadFrom.MasterPreferred)) + cmds <- Redis.masterReplica[IO, String, String](conn) + } yield cmds connection - .use { conn => - Redis.masterReplica[IO, String, String](conn).flatMap { cmd => - for { - x <- cmd.get(usernameKey) - _ <- showResult(x) - _ <- cmd.set(usernameKey, "some value") - y <- cmd.get(usernameKey) - _ <- showResult(y) - _ <- cmd.setNx(usernameKey, "should not happen") - w <- cmd.get(usernameKey) - _ <- showResult(w) - _ <- cmd.del(usernameKey) - z <- cmd.get(usernameKey) - _ <- showResult(z) - } yield () - } + .use { cmd => + for { + x <- cmd.get(usernameKey) + _ <- showResult(x) + _ <- cmd.set(usernameKey, "some value") + y <- cmd.get(usernameKey) + _ <- showResult(y) + _ <- cmd.setNx(usernameKey, "should not happen") + w <- cmd.get(usernameKey) + _ <- showResult(w) + _ <- cmd.del(usernameKey) + z <- cmd.get(usernameKey) + _ <- showResult(z) + } yield () } } diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisTransactionsDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisTransactionsDemo.scala index 618539e6..c595bf59 100644 --- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisTransactionsDemo.scala +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisTransactionsDemo.scala @@ -21,8 +21,10 @@ import cats.implicits._ import dev.profunktor.redis4cats.algebra.RedisCommands import dev.profunktor.redis4cats.connection._ import dev.profunktor.redis4cats.effect.Log +import dev.profunktor.redis4cats.hlist._ import dev.profunktor.redis4cats.interpreter.Redis import dev.profunktor.redis4cats.transactions._ +import java.util.concurrent.TimeoutException object RedisTransactionsDemo extends LoggerIOApp { @@ -33,7 +35,7 @@ object RedisTransactionsDemo extends LoggerIOApp { val key2 = "test2" val showResult: String => Option[String] => IO[Unit] = key => - _.fold(putStrLn(s"Not found key: $key"))(s => putStrLn(s)) + _.fold(putStrLn(s"Not found key: $key"))(s => putStrLn(s"$key: $s")) val commandsApi: Resource[IO, RedisCommands[IO, String, String]] = for { @@ -50,12 +52,30 @@ object RedisTransactionsDemo extends LoggerIOApp { cmd.get(key1).flatTap(showResult(key1)) *> cmd.get(key2).flatTap(showResult(key2)) - val tx1 = tx.run( - cmd.set(key1, "foo"), - cmd.set(key2, "bar") - ) + // the type is fully inferred but you can be explicit if you'd like - getters *> tx1 *> getters.void + //type Cmd = IO[Unit] :: IO[Unit] :: IO[Option[String]] :: IO[Unit] :: IO[Unit] :: IO[Option[String]] :: HNil + val operations = + cmd.set(key1, "sad") :: cmd.set(key2, "windows") :: cmd.get(key1) :: + cmd.set(key1, "nix") :: cmd.set(key2, "linux") :: cmd.get(key1) :: HNil + + //type Res = Unit :: Unit :: Option[String] :: Unit :: Unit :: Option[String] :: HNil + val prog = + tx.exec(operations) + .flatMap { + case _ ~: _ ~: res1 ~: _ ~: _ ~: res2 ~: HNil => + putStrLn(s"res1: $res1, res2: $res2") + } + .onError { + case TransactionAborted => + putStrLn("[Error] - Transaction Aborted") + case TransactionDiscarded => + putStrLn("[Error] - Transaction Discarded") + case _: TimeoutException => + putStrLn("[Error] - Timeout") + } + + getters >> prog >> getters >> putStrLn("keep doing stuff...") } } diff --git a/modules/log4cats/src/main/scala/dev/profunktor/redis4cats/log4cats.scala b/modules/log4cats/src/main/scala/dev/profunktor/redis4cats/log4cats.scala index 2ab08af2..640c5f46 100644 --- a/modules/log4cats/src/main/scala/dev/profunktor/redis4cats/log4cats.scala +++ b/modules/log4cats/src/main/scala/dev/profunktor/redis4cats/log4cats.scala @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 ProfunKtor + * Copyright 2018-2020 ProfunKtor * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/pubsub/LivePubSubCommands.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/pubsub/LivePubSubCommands.scala index de444233..89edfd11 100644 --- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/pubsub/LivePubSubCommands.scala +++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/pubsub/LivePubSubCommands.scala @@ -30,12 +30,13 @@ import io.lettuce.core.pubsub.StatefulRedisPubSubConnection class LivePubSubCommands[F[_]: ConcurrentEffect: ContextShift: Log, K, V]( state: Ref[F, PubSubState[F, K, V]], subConnection: StatefulRedisPubSubConnection[K, V], - pubConnection: StatefulRedisPubSubConnection[K, V] + pubConnection: StatefulRedisPubSubConnection[K, V], + blocker: Blocker ) extends PubSubCommands[Stream[F, *], K, V] { private[redis4cats] val subCommands: SubscribeCommands[Stream[F, *], K, V] = - new Subscriber[F, K, V](state, subConnection) - private[redis4cats] val pubSubStats: PubSubStats[Stream[F, *], K] = new LivePubSubStats(pubConnection) + new Subscriber[F, K, V](state, subConnection, blocker) + private[redis4cats] val pubSubStats: PubSubStats[Stream[F, *], K] = new LivePubSubStats(pubConnection, blocker) override def subscribe(channel: RedisChannel[K]): Stream[F, V] = subCommands.subscribe(channel) @@ -47,7 +48,7 @@ class LivePubSubCommands[F[_]: ConcurrentEffect: ContextShift: Log, K, V]( _.evalMap { message => state.get.flatMap { st => PubSubInternals[F, K, V](state, subConnection).apply(channel)(st) *> - JRFuture(F.delay(pubConnection.async().publish(channel.underlying, message))) + JRFuture(F.delay(pubConnection.async().publish(channel.underlying, message)))(blocker) }.void } diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/pubsub/LivePubSubStats.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/pubsub/LivePubSubStats.scala index dc1ad7bf..b1fefde6 100644 --- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/pubsub/LivePubSubStats.scala +++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/pubsub/LivePubSubStats.scala @@ -28,13 +28,14 @@ import io.lettuce.core.pubsub.StatefulRedisPubSubConnection import dev.profunktor.redis4cats.JavaConversions._ class LivePubSubStats[F[_]: Concurrent: ContextShift, K, V]( - pubConnection: StatefulRedisPubSubConnection[K, V] + pubConnection: StatefulRedisPubSubConnection[K, V], + blocker: Blocker ) extends PubSubStats[Stream[F, *], K] { override def pubSubChannels: Stream[F, List[K]] = Stream .eval { - JRFuture(F.delay(pubConnection.async().pubsubChannels())) + JRFuture(F.delay(pubConnection.async().pubsubChannels()))(blocker) } .map(_.asScala.toList) @@ -43,7 +44,7 @@ class LivePubSubStats[F[_]: Concurrent: ContextShift, K, V]( override def pubSubSubscriptions(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] = Stream.eval { - JRFuture(F.delay(pubConnection.async().pubsubNumsub(channels.map(_.underlying): _*))).flatMap { kv => + JRFuture(F.delay(pubConnection.async().pubsubNumsub(channels.map(_.underlying): _*)))(blocker).flatMap { kv => F.delay(kv.asScala.toList.map { case (k, n) => Subscription(RedisChannel[K](k), n) }) } } diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/pubsub/PubSub.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/pubsub/PubSub.scala index 3c8bfa00..d39b7c7a 100644 --- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/pubsub/PubSub.scala +++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/pubsub/PubSub.scala @@ -23,6 +23,7 @@ import dev.profunktor.redis4cats.algebra.{ PubSubCommands, PublishCommands, Subs import dev.profunktor.redis4cats.domain._ import dev.profunktor.redis4cats.connection.RedisClient import dev.profunktor.redis4cats.effect.{ JRFuture, Log } +import dev.profunktor.redis4cats.effect.JRFuture._ import fs2.Stream import fs2.concurrent.Topic import io.lettuce.core.pubsub.StatefulRedisPubSubConnection @@ -31,15 +32,16 @@ object PubSub { private[redis4cats] def acquireAndRelease[F[_]: ConcurrentEffect: ContextShift: Log, K, V]( client: RedisClient, - codec: RedisCodec[K, V] + codec: RedisCodec[K, V], + blocker: Blocker ): (F[StatefulRedisPubSubConnection[K, V]], StatefulRedisPubSubConnection[K, V] => F[Unit]) = { - val acquire: F[StatefulRedisPubSubConnection[K, V]] = JRFuture.fromConnectionFuture { + val acquire: F[StatefulRedisPubSubConnection[K, V]] = JRFuture.fromConnectionFuture( F.delay(client.underlying.connectPubSubAsync(codec.underlying, client.uri.underlying)) - } + )(blocker) val release: StatefulRedisPubSubConnection[K, V] => F[Unit] = c => - JRFuture.fromCompletableFuture(F.delay(c.closeAsync())) *> + JRFuture.fromCompletableFuture(F.delay(c.closeAsync()))(blocker) *> F.info(s"Releasing PubSub connection: ${client.uri.underlying}") (acquire, release) @@ -53,17 +55,18 @@ object PubSub { def mkPubSubConnection[F[_]: ConcurrentEffect: ContextShift: Log, K, V]( client: RedisClient, codec: RedisCodec[K, V] - ): Stream[F, PubSubCommands[Stream[F, *], K, V]] = { - val (acquire, release) = acquireAndRelease[F, K, V](client, codec) - // One exclusive connection for subscriptions and another connection for publishing / stats - for { - state <- Stream.eval(Ref.of(Map.empty[K, Topic[F, Option[V]]])) - sConn <- Stream.bracket(acquire)(release) - pConn <- Stream.bracket(acquire)(release) - subs <- Stream.emit(new LivePubSubCommands[F, K, V](state, sConn, pConn)) - } yield subs + ): Stream[F, PubSubCommands[Stream[F, *], K, V]] = + Stream.resource(mkBlocker[F]).flatMap { blocker => + val (acquire, release) = acquireAndRelease[F, K, V](client, codec, blocker) + // One exclusive connection for subscriptions and another connection for publishing / stats + for { + state <- Stream.eval(Ref.of(Map.empty[K, Topic[F, Option[V]]])) + sConn <- Stream.bracket(acquire)(release) + pConn <- Stream.bracket(acquire)(release) + subs <- Stream.emit(new LivePubSubCommands[F, K, V](state, sConn, pConn, blocker)) + } yield subs - } + } /** * Creates a PubSub connection. @@ -73,10 +76,11 @@ object PubSub { def mkPublisherConnection[F[_]: ConcurrentEffect: ContextShift: Log, K, V]( client: RedisClient, codec: RedisCodec[K, V] - ): Stream[F, PublishCommands[Stream[F, *], K, V]] = { - val (acquire, release) = acquireAndRelease[F, K, V](client, codec) - Stream.bracket(acquire)(release).map(c => new Publisher[F, K, V](c)) - } + ): Stream[F, PublishCommands[Stream[F, *], K, V]] = + Stream.resource(mkBlocker[F]).flatMap { blocker => + val (acquire, release) = acquireAndRelease[F, K, V](client, codec, blocker) + Stream.bracket(acquire)(release).map(c => new Publisher[F, K, V](c, blocker)) + } /** * Creates a PubSub connection. @@ -86,11 +90,12 @@ object PubSub { def mkSubscriberConnection[F[_]: ConcurrentEffect: ContextShift: Log, K, V]( client: RedisClient, codec: RedisCodec[K, V] - ): Stream[F, SubscribeCommands[Stream[F, *], K, V]] = { - val (acquire, release) = acquireAndRelease[F, K, V](client, codec) - Stream.eval(Ref.of(Map.empty[K, Topic[F, Option[V]]])).flatMap { st => - Stream.bracket(acquire)(release).map(new Subscriber(st, _)) + ): Stream[F, SubscribeCommands[Stream[F, *], K, V]] = + Stream.resource(mkBlocker[F]).flatMap { blocker => + val (acquire, release) = acquireAndRelease[F, K, V](client, codec, blocker) + Stream.eval(Ref.of(Map.empty[K, Topic[F, Option[V]]])).flatMap { st => + Stream.bracket(acquire)(release).map(new Subscriber(st, _, blocker)) + } } - } } diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/pubsub/Publisher.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/pubsub/Publisher.scala index b8af6cd1..58625706 100644 --- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/pubsub/Publisher.scala +++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/pubsub/Publisher.scala @@ -25,13 +25,15 @@ import dev.profunktor.redis4cats.effect.JRFuture import fs2.Stream import io.lettuce.core.pubsub.StatefulRedisPubSubConnection -class Publisher[F[_]: ConcurrentEffect: ContextShift, K, V](pubConnection: StatefulRedisPubSubConnection[K, V]) - extends PublishCommands[Stream[F, *], K, V] { +class Publisher[F[_]: ConcurrentEffect: ContextShift, K, V]( + pubConnection: StatefulRedisPubSubConnection[K, V], + blocker: Blocker +) extends PublishCommands[Stream[F, *], K, V] { - private[redis4cats] val pubSubStats: PubSubStats[Stream[F, *], K] = new LivePubSubStats(pubConnection) + private[redis4cats] val pubSubStats: PubSubStats[Stream[F, *], K] = new LivePubSubStats(pubConnection, blocker) override def publish(channel: RedisChannel[K]): Stream[F, V] => Stream[F, Unit] = - _.evalMap(message => JRFuture(F.delay(pubConnection.async().publish(channel.underlying, message))).void) + _.evalMap(message => JRFuture(F.delay(pubConnection.async().publish(channel.underlying, message)))(blocker).void) override def pubSubChannels: Stream[F, List[K]] = pubSubStats.pubSubChannels diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/pubsub/Subscriber.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/pubsub/Subscriber.scala index fa98c10d..8fc30364 100644 --- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/pubsub/Subscriber.scala +++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/pubsub/Subscriber.scala @@ -29,7 +29,8 @@ import io.lettuce.core.pubsub.StatefulRedisPubSubConnection class Subscriber[F[_]: ConcurrentEffect: ContextShift: Log, K, V]( state: Ref[F, PubSubState[F, K, V]], - subConnection: StatefulRedisPubSubConnection[K, V] + subConnection: StatefulRedisPubSubConnection[K, V], + blocker: Blocker ) extends SubscribeCommands[Stream[F, *], K, V] { override def subscribe(channel: RedisChannel[K]): Stream[F, V] = @@ -37,16 +38,17 @@ class Subscriber[F[_]: ConcurrentEffect: ContextShift: Log, K, V]( .eval( state.get.flatMap { st => PubSubInternals[F, K, V](state, subConnection).apply(channel)(st) <* - JRFuture(F.delay(subConnection.async().subscribe(channel.underlying))) + JRFuture(F.delay(subConnection.async().subscribe(channel.underlying)))(blocker) } ) .flatMap(_.subscribe(500).unNone) override def unsubscribe(channel: RedisChannel[K]): Stream[F, Unit] = Stream.eval { - JRFuture(F.delay(subConnection.async().unsubscribe(channel.underlying))).void.guarantee(state.get.flatMap { st => - st.get(channel.underlying).fold(().pure)(_.publish1(none[V])) *> state.update(_ - channel.underlying) - }) + JRFuture(F.delay(subConnection.async().unsubscribe(channel.underlying)))(blocker).void + .guarantee(state.get.flatMap { st => + st.get(channel.underlying).fold(().pure)(_.publish1(none[V])) *> state.update(_ - channel.underlying) + }) } } diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/streams/Fs2RawStreaming.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/streams/Fs2RawStreaming.scala index 03c96694..b8892030 100644 --- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/streams/Fs2RawStreaming.scala +++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/streams/Fs2RawStreaming.scala @@ -27,19 +27,20 @@ import io.lettuce.core.api.StatefulRedisConnection import dev.profunktor.redis4cats.JavaConversions._ private[streams] class RedisRawStreaming[F[_]: Concurrent: ContextShift, K, V]( - val client: StatefulRedisConnection[K, V] + val client: StatefulRedisConnection[K, V], + blocker: Blocker ) extends RawStreaming[F, K, V] { override def xAdd(key: K, body: Map[K, V]): F[MessageId] = JRFuture { F.delay(client.async().xadd(key, body.asJava)) - }.map(MessageId) + }(blocker).map(MessageId) override def xRead(streams: Set[StreamingOffset[K]]): F[List[StreamingMessageWithId[K, V]]] = { val offsets = streams.map(s => StreamOffset.from(s.key, s.offset)).toSeq JRFuture { F.delay(client.async().xread(offsets: _*)) - }.map { list => + }(blocker).map { list => list.asScala.toList.map { msg => StreamingMessageWithId[K, V](MessageId(msg.getId), msg.getStream, msg.getBody.asScala.toMap) } diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/streams/Fs2Streaming.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/streams/Fs2Streaming.scala index 88c95814..03e1d010 100644 --- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/streams/Fs2Streaming.scala +++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/interpreter/streams/Fs2Streaming.scala @@ -25,6 +25,7 @@ import dev.profunktor.redis4cats.connection.RedisMasterReplica import dev.profunktor.redis4cats.domain._ import dev.profunktor.redis4cats.connection.{ RedisClient, RedisURI } import dev.profunktor.redis4cats.effect.{ JRFuture, Log } +import dev.profunktor.redis4cats.effect.JRFuture._ import dev.profunktor.redis4cats.streams._ import fs2.Stream import io.lettuce.core.{ ReadFrom => JReadFrom } @@ -34,26 +35,29 @@ object RedisStream { def mkStreamingConnection[F[_]: Concurrent: ContextShift: Log, K, V]( client: RedisClient, codec: RedisCodec[K, V] - ): Stream[F, Streaming[Stream[F, *], K, V]] = { - val acquire = JRFuture - .fromConnectionFuture { - F.delay(client.underlying.connectAsync[K, V](codec.underlying, client.uri.underlying)) - } - .map(new RedisRawStreaming(_)) + ): Stream[F, Streaming[Stream[F, *], K, V]] = + Stream.resource(mkBlocker[F]).flatMap { blocker => + val acquire = JRFuture + .fromConnectionFuture(F.delay(client.underlying.connectAsync[K, V](codec.underlying, client.uri.underlying)))( + blocker + ) + .map(new RedisRawStreaming(_, blocker)) - val release: RedisRawStreaming[F, K, V] => F[Unit] = c => - JRFuture.fromCompletableFuture(F.delay(c.client.closeAsync())) *> - F.info(s"Releasing Streaming connection: ${client.uri.underlying}") + val release: RedisRawStreaming[F, K, V] => F[Unit] = c => + JRFuture.fromCompletableFuture(F.delay(c.client.closeAsync()))(blocker) *> + F.info(s"Releasing Streaming connection: ${client.uri.underlying}") - Stream.bracket(acquire)(release).map(rs => new RedisStream(rs)) - } + Stream.bracket(acquire)(release).map(rs => new RedisStream(rs)) + } def mkMasterReplicaConnection[F[_]: Concurrent: ContextShift: Log, K, V]( codec: RedisCodec[K, V], uris: RedisURI* )(readFrom: Option[JReadFrom] = None): Stream[F, Streaming[Stream[F, *], K, V]] = - Stream.resource(RedisMasterReplica[F, K, V](codec, uris: _*)(readFrom)).map { conn => - new RedisStream(new RedisRawStreaming(conn.underlying)) + Stream.resource(mkBlocker[F]).flatMap { blocker => + Stream.resource(RedisMasterReplica[F, K, V](codec, uris: _*)(readFrom)).map { conn => + new RedisStream(new RedisRawStreaming(conn.underlying, blocker)) + } } } diff --git a/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala b/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala index 2e310ece..7ba82431 100644 --- a/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala +++ b/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala @@ -23,6 +23,7 @@ import cats.implicits._ import dev.profunktor.redis4cats.algebra._ import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.effects._ +import dev.profunktor.redis4cats.hlist._ import dev.profunktor.redis4cats.transactions._ import io.lettuce.core.GeoArgs import scala.concurrent.duration._ @@ -255,23 +256,29 @@ trait TestScenarios { val tx = RedisTransaction(cmd) - for { - _ <- tx.run(cmd.set(key1, "foo"), cmd.set(key2, "bar")) - x <- cmd.get(key1) - _ <- IO(assert(x.contains("foo"))) - y <- cmd.get(key2) - _ <- IO(assert(y.contains("bar"))) - } yield () + val operations = + cmd.set(key1, "osx") :: cmd.set(key2, "windows") :: cmd.get(key1) :: cmd.sIsMember("foo", "bar") :: + cmd.set(key1, "nix") :: cmd.set(key2, "linux") :: cmd.get(key1) :: HNil + + tx.exec(operations).map { + case _ ~: _ ~: res1 ~: res2 ~: _ ~: _ ~: res3 ~: HNil => + assert(res1.contains("osx")) + assert(res2 === false) + assert(res3.contains("nix")) + case tr => + assert(false, s"Unexpected result: $tr") + } } def canceledTransactionScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = { val tx = RedisTransaction(cmd) - val commands = (1 to 10000).toList.map(x => cmd.set(s"tx-$x", s"v$x")) + val commands = + cmd.set(s"tx-1", s"v1") :: cmd.set(s"tx-2", s"v2") :: cmd.set(s"tx-3", s"v3") :: HNil // Transaction should be canceled - IO.race(tx.run(commands: _*), IO.unit) >> + IO.race(tx.exec(commands), IO.unit) >> cmd.get("tx-1").map(x => assert(x.isEmpty)) // no keys written } diff --git a/site/docs/effects/index.md b/site/docs/effects/index.md index 39987966..48bd3a03 100644 --- a/site/docs/effects/index.md +++ b/site/docs/effects/index.md @@ -48,7 +48,7 @@ import io.chrisdavenport.log4cats.Logger import io.chrisdavenport.log4cats.slf4j.Slf4jLogger implicit val cs = IO.contextShift(scala.concurrent.ExecutionContext.global) -implicit val logger: Logger[IO] = Slf4jLogger.unsafeCreate[IO] +implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] val stringCodec: RedisCodec[String, String] = RedisCodec.Utf8 @@ -100,18 +100,18 @@ import dev.profunktor.redis4cats.connection.RedisMasterReplica import dev.profunktor.redis4cats.interpreter.Redis import dev.profunktor.redis4cats.domain.{ ReadFrom} -// Already Imported Above, but if copying from this block is necessary +// Already imported above, but if copying from this block is necessary // val stringCodec: RedisCodec[String, String] = RedisCodec.Utf8 -val connection: Resource[IO, RedisMasterReplica[String, String]] = - Resource.liftF(RedisURI.make[IO]("redis://localhost")).flatMap { uri => - RedisMasterReplica[IO, String, String](stringCodec, uri)(Some(ReadFrom.MasterPreferred)) - } +val commands: Resource[IO, StringCommands[IO, String, String]] = + for { + uri <- Resource.liftF(RedisURI.make[IO]("redis://localhost")) + conn <- RedisMasterReplica[IO, String, String](stringCodec, uri)(Some(ReadFrom.MasterPreferred)) + cmds <- Redis.masterReplica[IO, String, String](conn) + } yield cmds -connection.use { conn => - Redis.masterReplica[IO, String, String](conn).flatMap { cmd => - cmd.set("foo", "123") >> IO.unit // do something - } +commands.use { cmd => + cmd.set("foo", "123") >> IO.unit // do something } ``` diff --git a/site/docs/transactions.md b/site/docs/transactions.md index 237facec..2ba46ec5 100644 --- a/site/docs/transactions.md +++ b/site/docs/transactions.md @@ -7,17 +7,20 @@ position: 3 # Transactions -Redis supports [transactions](https://redis.io/topics/transactions) via the `MULTI`, `EXEC` and `DISCARD` commands. `redis4cats` provides a `RedisTransaction` utility that models a transaction as a resource via the primitive `bracketCase`. +Redis supports [transactions](https://redis.io/topics/transactions) via the `MULTI`, `EXEC` and `DISCARD` commands. `redis4cats` provides a `RedisTransaction` utility that models a transaction as a `Resource`. -- `acquire`: begin transaction -- `use`: send transactional commands -- `release`: either commit on success or rollback on failure / cancellation. +Note that every command has to be forked (`.start`) because the commands need to be sent to the server asynchronously and no response will be received until either an `EXEC` or a `DISCARD` command is sent. Both forking and sending the final command is handled by `RedisTransaction`. + +These are internals, though. All you need to care about is what commands you want to run as part of a transaction +and handle the possible errors and retry logic. ### Working with transactions -The most common way is to create a `RedisTransaction` once by passing the commands API as a parameter and invoke the `run` function every time you want to run the given commands as part of a new transaction. +The most common way is to create a `RedisTransaction` once by passing the commands API as a parameter and invoke the `exec` function every time you want to run the given commands as part of a new transaction. + +Every command has to be atomic and independent of previous Redis results, so it is not recommended to chain commands using `flatMap`. -Note that every command has to be forked (`.start`) because the commands need to be sent to the server asynchronously and no response will be received until either an `EXEC` or a `DISCARD` command is sent. Both forking and sending the final command is handled by `RedisTransaction`. Also, it is not possible to sequence commands (`flatMap`) that are part of a transaction. Every command has to be atomic and independent of previous results. +Below you can find a first example of transactional commands. ```scala mdoc:invisible import cats.effect.{IO, Resource} @@ -27,9 +30,11 @@ import dev.profunktor.redis4cats.domain._ import dev.profunktor.redis4cats.log4cats._ import io.chrisdavenport.log4cats.Logger import io.chrisdavenport.log4cats.slf4j.Slf4jLogger +import scala.concurrent.ExecutionContext -implicit val cs = IO.contextShift(scala.concurrent.ExecutionContext.global) -implicit val logger: Logger[IO] = Slf4jLogger.unsafeCreate[IO] +implicit val cs = IO.contextShift(ExecutionContext.global) +implicit val timer = IO.timer(ExecutionContext.global) +implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] val commandsApi: Resource[IO, RedisCommands[IO, String, String]] = { Redis[IO, String, String](null, null.asInstanceOf[RedisCodec[String, String]]) @@ -40,7 +45,9 @@ val commandsApi: Resource[IO, RedisCommands[IO, String, String]] = { import cats.effect.IO import cats.implicits._ import dev.profunktor.redis4cats._ +import dev.profunktor.redis4cats.hlist._ import dev.profunktor.redis4cats.transactions._ +import java.util.concurrent.TimeoutException def putStrLn(str: String): IO[Unit] = IO(println(str)) @@ -57,18 +64,41 @@ commandsApi.use { cmd => // RedisCommands[IO, String, String] cmd.get(key1).flatTap(showResult(key1)) *> cmd.get(key2).flatTap(showResult(key2)) - val setters = tx.run( - cmd.set(key1, "foo"), - cmd.set(key2, "bar"), - // .. more transactional commands .. - ) - - getters *> setters *> getters.void + // the commands type is fully inferred + // IO[Unit] :: IO[Option[String]] :: IO[Unit] :: HNil + val commands = cmd.set(key1, "foo") :: cmd.get(key1) :: cmd.set(key2, "bar") :: HNil + + // the result type is inferred as well + // Unit :: Option[String] :: Unit :: HNil + val prog = + tx.exec(commands) + .flatMap { + case _ ~: res1 ~: _ ~: HNil => + putStrLn(s"Key1 result: $res1") + } + .onError { + case TransactionAborted => + putStrLn("[Error] - Transaction Aborted") + case TransactionDiscarded => + putStrLn("[Error] - Transaction Discarded") + case _: TimeoutException => + putStrLn("[Error] - Timeout") + } + + getters >> prog >> getters.void } ``` It should be exclusively used to run Redis commands as part of a transaction, not any other computations. Fail to do so, may result in unexpected behavior. +Transactional commands may be discarded if something went wrong in between. The possible errors you may get are: + +- `TransactionDiscarded`: The `EXEC` command failed and the transactional commands were discarded. +- `TransactionAborted`: The `DISCARD` command was triggered due to cancellation or other failure within the transaction. +- `TimeoutException`: The transaction timed out due to some unknown error. + +### How NOT to use transactions + For example, the following transaction will result in a dead-lock: ```scala mdoc:silent @@ -79,19 +109,17 @@ commandsApi.use { cmd => cmd.get(key1).flatTap(showResult(key1)) *> cmd.get(key2).flatTap(showResult(key2)) - val setters = tx.run( - cmd.set(key1, "foo"), - cmd.set(key2, "bar"), - cmd.discard + val setters = tx.exec( + cmd.set(key1, "foo") :: cmd.set(key2, "bar") :: cmd.discard :: HNil ) - getters *> setters *> getters.void + getters *> setters.void *> getters.void } ``` You should never pass a transactional command: `MULTI`, `EXEC` or `DISCARD`. -The following example will result in a successful transaction; the error will be swallowed: +The following example will result in a successful transaction on Redis. Yet, the operation will end up raising the error passed as a command. ```scala mdoc:silent commandsApi.use { cmd => @@ -101,12 +129,10 @@ commandsApi.use { cmd => cmd.get(key1).flatTap(showResult(key1)) *> cmd.get(key2).flatTap(showResult(key2)) - val failedTx = tx.run( - cmd.set(key1, "foo"), - cmd.set(key2, "bar"), - IO.raiseError(new Exception("boom")) + val failedTx = tx.exec( + cmd.set(key1, "foo") :: cmd.set(key2, "bar") :: IO.raiseError(new Exception("boom")) :: HNil ) - getters *> failedTx *> getters.void + getters *> failedTx.void *> getters.void } ```