Skip to content

Commit

Permalink
Merge pull request #273 from profunktor/feature/async-transactions-wi…
Browse files Browse the repository at this point in the history
…th-hlists

Typed transactions using a custom HList
  • Loading branch information
gvolpe authored May 7, 2020
2 parents 0d09ef7 + b4f5e3c commit 439f0d2
Show file tree
Hide file tree
Showing 25 changed files with 1,121 additions and 783 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ redis4cats

Redis client built on top of [Cats Effect](https://typelevel.org/cats-effect/), [Fs2](http://fs2.io/) and the async Java client [Lettuce](https://lettuce.io/).

> **NOTE**: Neither binary compatibility nor API stability are guaranteed between releases.
> **NOTE**: Neither binary compatibility nor API stability will be guaranteed until we reach `1.0.0`.
`redis4cats` defines two types of API: an effect-based using [Cats Effect](https://typelevel.org/cats-effect/) and a stream-based using [Fs2](http://fs2.io/).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ import cats.effect._
import cats.syntax.apply._
import cats.syntax.functor._
import dev.profunktor.redis4cats.effect.{ JRFuture, Log }
import dev.profunktor.redis4cats.effect.JRFuture._
import io.lettuce.core.{ RedisClient => JRedisClient, RedisURI => JRedisURI }

sealed abstract case class RedisClient private (underlying: JRedisClient, uri: RedisURI)

object RedisClient {

private[redis4cats] def acquireAndRelease[F[_]: Concurrent: ContextShift: Log](
uri: => RedisURI
uri: => RedisURI,
blocker: Blocker
): (F[RedisClient], RedisClient => F[Unit]) = {
val acquire: F[RedisClient] = F.delay {
val jClient: JRedisClient = JRedisClient.create(uri.underlying)
Expand All @@ -36,19 +38,21 @@ object RedisClient {

val release: RedisClient => F[Unit] = client =>
F.info(s"Releasing Redis connection: $uri") *>
JRFuture.fromCompletableFuture(F.delay(client.underlying.shutdownAsync())).void
JRFuture.fromCompletableFuture(F.delay(client.underlying.shutdownAsync()))(blocker).void

(acquire, release)
}

private[redis4cats] def acquireAndReleaseWithoutUri[F[_]: Concurrent: ContextShift: Log]
: F[(F[RedisClient], RedisClient => F[Unit])] =
F.delay(RedisURI.fromUnderlying(new JRedisURI())).map(acquireAndRelease(_))
private[redis4cats] def acquireAndReleaseWithoutUri[F[_]: Concurrent: ContextShift: Log](
blocker: Blocker
): F[(F[RedisClient], RedisClient => F[Unit])] =
F.delay(RedisURI.fromUnderlying(new JRedisURI())).map(uri => acquireAndRelease(uri, blocker))

def apply[F[_]: Concurrent: ContextShift: Log](uri: => RedisURI): Resource[F, RedisClient] = {
val (acquire, release) = acquireAndRelease(uri)
Resource.make(acquire)(release)
}
def apply[F[_]: Concurrent: ContextShift: Log](uri: => RedisURI): Resource[F, RedisClient] =
mkBlocker[F].flatMap { blocker =>
val (acquire, release) = acquireAndRelease(uri, blocker)
Resource.make(acquire)(release)
}

def fromUnderlyingWithUri(underlying: JRedisClient, uri: RedisURI): RedisClient =
new RedisClient(underlying, uri) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ package dev.profunktor.redis4cats.connection

import cats.effect._
import cats.implicits._
import dev.profunktor.redis4cats.JavaConversions._
import dev.profunktor.redis4cats.domain.NodeId
import dev.profunktor.redis4cats.effect.{ JRFuture, Log }
import dev.profunktor.redis4cats.effect.JRFuture._
import io.lettuce.core.cluster.{ SlotHash, RedisClusterClient => JClusterClient }
import io.lettuce.core.cluster.models.partitions.{ Partitions => JPartitions }

import dev.profunktor.redis4cats.JavaConversions._

sealed abstract case class RedisClusterClient private (underlying: JClusterClient)

object RedisClusterClient {

private[redis4cats] def acquireAndRelease[F[_]: Concurrent: ContextShift: Log](
blocker: Blocker,
uri: RedisURI*
): (F[RedisClusterClient], RedisClusterClient => F[Unit]) = {

Expand All @@ -41,18 +42,19 @@ object RedisClusterClient {

val release: RedisClusterClient => F[Unit] = client =>
F.info(s"Releasing Redis Cluster client: ${client.underlying}") *>
JRFuture.fromCompletableFuture(F.delay(client.underlying.shutdownAsync())).void
JRFuture.fromCompletableFuture(F.delay(client.underlying.shutdownAsync()))(blocker).void

(acquire, release)
}

private[redis4cats] def initializeClusterPartitions[F[_]: Sync](client: JClusterClient): F[Unit] =
F.delay(client.getPartitions).void

def apply[F[_]: Concurrent: ContextShift: Log](uri: RedisURI*): Resource[F, RedisClusterClient] = {
val (acquire, release) = acquireAndRelease(uri: _*)
Resource.make(acquire)(release)
}
def apply[F[_]: Concurrent: ContextShift: Log](uri: RedisURI*): Resource[F, RedisClusterClient] =
mkBlocker[F].flatMap { blocker =>
val (acquire, release) = acquireAndRelease(blocker, uri: _*)
Resource.make(acquire)(release)
}

def fromUnderlying(underlying: JClusterClient): RedisClusterClient =
new RedisClusterClient(underlying) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@ import dev.profunktor.redis4cats.domain.NodeId
import dev.profunktor.redis4cats.effect.JRFuture
import io.lettuce.core.api.StatefulRedisConnection
import io.lettuce.core.api.async.RedisAsyncCommands
import io.lettuce.core.api.sync.{ RedisCommands => RedisSyncCommands }
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands
import io.lettuce.core.cluster.api.sync.{ RedisClusterCommands => RedisClusterSyncCommands }
import scala.util.control.NoStackTrace

case class OperationNotSupported(value: String) extends NoStackTrace {
override def toString(): String = s"OperationNotSupported($value)"
}

private[redis4cats] trait RedisConnection[F[_], K, V] {
def sync: F[RedisSyncCommands[K, V]]
def clusterSync: F[RedisClusterSyncCommands[K, V]]
def async: F[RedisAsyncCommands[K, V]]
def clusterAsync: F[RedisClusterAsyncCommands[K, V]]
def close: F[Unit]
Expand All @@ -39,31 +43,42 @@ private[redis4cats] trait RedisConnection[F[_], K, V] {
}

private[redis4cats] class RedisStatefulConnection[F[_]: Concurrent: ContextShift, K, V](
conn: StatefulRedisConnection[K, V]
conn: StatefulRedisConnection[K, V],
blocker: Blocker
) extends RedisConnection[F, K, V] {
def sync: F[RedisSyncCommands[K, V]] = F.delay(conn.sync())
def clusterSync: F[RedisClusterSyncCommands[K, V]] =
F.raiseError(OperationNotSupported("Running in a single node"))
def async: F[RedisAsyncCommands[K, V]] = F.delay(conn.async())
def clusterAsync: F[RedisClusterAsyncCommands[K, V]] =
F.raiseError(OperationNotSupported("Running in a single node"))
def close: F[Unit] = JRFuture.fromCompletableFuture(F.delay(conn.closeAsync())).void
def close: F[Unit] = JRFuture.fromCompletableFuture(F.delay(conn.closeAsync()))(blocker).void
def byNode(nodeId: NodeId): F[RedisAsyncCommands[K, V]] =
F.raiseError(OperationNotSupported("Running in a single node"))
def liftK[G[_]: Concurrent: ContextShift]: RedisConnection[G, K, V] = new RedisStatefulConnection[G, K, V](conn)
def liftK[G[_]: Concurrent: ContextShift]: RedisConnection[G, K, V] =
new RedisStatefulConnection[G, K, V](conn, blocker)
}

private[redis4cats] class RedisStatefulClusterConnection[F[_]: Concurrent: ContextShift, K, V](
conn: StatefulRedisClusterConnection[K, V]
conn: StatefulRedisClusterConnection[K, V],
blocker: Blocker
) extends RedisConnection[F, K, V] {
def sync: F[RedisSyncCommands[K, V]] =
F.raiseError(
OperationNotSupported("Transactions are not supported in a cluster. You must select a single node.")
)
def async: F[RedisAsyncCommands[K, V]] =
F.raiseError(
OperationNotSupported("Transactions are not supported in a cluster. You must select a single node.")
)
def clusterAsync: F[RedisClusterAsyncCommands[K, V]] = F.delay(conn.async())
def clusterSync: F[RedisClusterSyncCommands[K, V]] = F.delay(conn.sync())
def close: F[Unit] =
JRFuture.fromCompletableFuture(F.delay(conn.closeAsync())).void
JRFuture.fromCompletableFuture(F.delay(conn.closeAsync()))(blocker).void
def byNode(nodeId: NodeId): F[RedisAsyncCommands[K, V]] =
JRFuture.fromCompletableFuture(F.delay(conn.getConnectionAsync(nodeId.value))).flatMap { stateful =>
JRFuture.fromCompletableFuture(F.delay(conn.getConnectionAsync(nodeId.value)))(blocker).flatMap { stateful =>
F.delay(stateful.async())
}
def liftK[G[_]: Concurrent: ContextShift]: RedisConnection[G, K, V] =
new RedisStatefulClusterConnection[G, K, V](conn)
new RedisStatefulClusterConnection[G, K, V](conn, blocker)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package dev.profunktor.redis4cats.connection

import cats.effect._
import cats.syntax.all._
import dev.profunktor.redis4cats.JavaConversions._
import dev.profunktor.redis4cats.domain._
import dev.profunktor.redis4cats.effect.{ JRFuture, Log }
import dev.profunktor.redis4cats.effect.JRFuture._
import io.lettuce.core.masterreplica.{ MasterReplica, StatefulRedisMasterReplicaConnection }
import io.lettuce.core.{ ReadFrom => JReadFrom }

import dev.profunktor.redis4cats.JavaConversions._

sealed abstract case class RedisMasterReplica[K, V] private (underlying: StatefulRedisMasterReplicaConnection[K, V])

object RedisMasterReplica {
Expand All @@ -33,26 +33,27 @@ object RedisMasterReplica {
client: RedisClient,
codec: RedisCodec[K, V],
readFrom: Option[JReadFrom],
blocker: Blocker,
uris: RedisURI*
): (F[RedisMasterReplica[K, V]], RedisMasterReplica[K, V] => F[Unit]) = {

val acquire: F[RedisMasterReplica[K, V]] = {

val connection: F[RedisMasterReplica[K, V]] =
JRFuture
.fromCompletableFuture[F, StatefulRedisMasterReplicaConnection[K, V]] {
.fromCompletableFuture[F, StatefulRedisMasterReplicaConnection[K, V]](
F.delay {
MasterReplica.connectAsync[K, V](client.underlying, codec.underlying, uris.map(_.underlying).asJava)
}
}
)(blocker)
.map(new RedisMasterReplica(_) {})

readFrom.fold(connection)(rf => connection.flatMap(c => F.delay(c.underlying.setReadFrom(rf)) *> c.pure[F]))
}

val release: RedisMasterReplica[K, V] => F[Unit] = connection =>
F.info(s"Releasing Redis Master/Replica connection: ${connection.underlying}") *>
JRFuture.fromCompletableFuture(F.delay(connection.underlying.closeAsync())).void
JRFuture.fromCompletableFuture(F.delay(connection.underlying.closeAsync()))(blocker).void

(acquire, release)
}
Expand All @@ -61,12 +62,14 @@ object RedisMasterReplica {
codec: RedisCodec[K, V],
uris: RedisURI*
)(readFrom: Option[JReadFrom] = None): Resource[F, RedisMasterReplica[K, V]] =
Resource.liftF(RedisClient.acquireAndReleaseWithoutUri[F]).flatMap {
case (acquireClient, releaseClient) =>
Resource.make(acquireClient)(releaseClient).flatMap { client =>
val (acquire, release) = acquireAndRelease(client, codec, readFrom, uris: _*)
Resource.make(acquire)(release)
}
mkBlocker[F].flatMap { blocker =>
Resource.liftF(RedisClient.acquireAndReleaseWithoutUri[F](blocker)).flatMap {
case (acquireClient, releaseClient) =>
Resource.make(acquireClient)(releaseClient).flatMap { client =>
val (acquire, release) = acquireAndRelease(client, codec, readFrom, blocker, uris: _*)
Resource.make(acquire)(release)
}
}
}

def fromUnderlying[K, V](underlying: StatefulRedisMasterReplicaConnection[K, V]) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,77 @@

package dev.profunktor.redis4cats.effect

import java.util.concurrent.{ CompletableFuture, CompletionStage, Future }

import cats.effect.{ Async, ContextShift }
import cats.implicits._
import cats.effect._
import cats.effect.implicits._
import cats.implicits._
import io.lettuce.core.{ ConnectionFuture, RedisFuture }
import java.util.concurrent._

trait RedisBlocker {
def ec: Blocker
}

object RedisBlocker {
def apply(blocker: Blocker): RedisBlocker =
new RedisBlocker {
def ec: Blocker = blocker
}
}

object JRFuture {

private[redis4cats] type JFuture[A] = CompletionStage[A] with Future[A]

def apply[F[_]: Async: ContextShift, A](fa: F[RedisFuture[A]]): F[A] =
liftJFuture[F, RedisFuture[A], A](fa)
private[redis4cats] def mkBlocker[F[_]: Sync]: Resource[F, Blocker] =
Blocker.fromExecutorService(F.delay(Executors.newFixedThreadPool(1)))

def apply[F[_]: Concurrent: ContextShift, A](
fa: F[RedisFuture[A]]
)(blocker: Blocker): F[A] =
liftJFuture[F, RedisFuture[A], A](fa)(blocker)

def fromConnectionFuture[F[_]: Concurrent: ContextShift, A](
fa: F[ConnectionFuture[A]]
)(blocker: Blocker): F[A] =
liftJFuture[F, ConnectionFuture[A], A](fa)(blocker)

def fromConnectionFuture[F[_]: Async: ContextShift, A](fa: F[ConnectionFuture[A]]): F[A] =
liftJFuture[F, ConnectionFuture[A], A](fa)
def fromCompletableFuture[F[_]: Concurrent: ContextShift, A](
fa: F[CompletableFuture[A]]
)(blocker: Blocker): F[A] =
liftJFuture[F, CompletableFuture[A], A](fa)(blocker)

def fromCompletableFuture[F[_]: Async: ContextShift, A](fa: F[CompletableFuture[A]]): F[A] =
liftJFuture[F, CompletableFuture[A], A](fa)
implicit class FutureLiftOps[F[_]: Concurrent: ContextShift, A](fa: F[RedisFuture[A]]) {
def futureLift(implicit rb: RedisBlocker): F[A] =
liftJFuture[F, RedisFuture[A], A](fa)(rb.ec)
}

private[redis4cats] def liftJFuture[
F[_]: Async: ContextShift,
F[_]: Concurrent: ContextShift,
G <: JFuture[A],
A
](fa: F[G]): F[A] =
fa.flatMap { f =>
F.async[A] { cb =>
f.handle[Unit] { (value: A, t: Throwable) =>
if (t != null) cb(Left(t))
else cb(Right(value))
](fa: F[G])(blocker: Blocker): F[A] = {
val lifted: F[A] = blocker.blockOn {
fa.flatMap { f =>
blocker.blockOn {
F.cancelable { cb =>
f.handle[Unit] { (res: A, err: Throwable) =>
err match {
case null =>
cb(Right(res))
case _: CancellationException =>
()
case ex: CompletionException if ex.getCause ne null =>
cb(Left(ex.getCause))
case ex =>
cb(Left(ex))
}
}
blocker.delay(f.cancel(true)).void
}
()
}
.guarantee(F.shift)
}
}
lifted.guarantee(F.shift)
}

}
Loading

0 comments on commit 439f0d2

Please sign in to comment.