Skip to content

Commit

Permalink
Merge pull request #297 from profunktor/feature/client-options
Browse files Browse the repository at this point in the history
[Feature] - Client options
  • Loading branch information
gvolpe authored May 15, 2020
2 parents 917f903 + cb2d34d commit 3c2679a
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ import cats.effect._
import cats.implicits._
import dev.profunktor.redis4cats.effect.{ JRFuture, Log }
import dev.profunktor.redis4cats.effect.JRFuture._
import io.lettuce.core.{ RedisClient => JRedisClient, RedisURI => JRedisURI }
import io.lettuce.core.{ ClientOptions, RedisClient => JRedisClient, RedisURI => JRedisURI }

sealed abstract case class RedisClient private (underlying: JRedisClient, uri: RedisURI)

object RedisClient {

private[redis4cats] def acquireAndRelease[F[_]: Concurrent: ContextShift: Log](
uri: => RedisURI,
opts: ClientOptions,
blocker: Blocker
): (F[RedisClient], RedisClient => F[Unit]) = {
val acquire: F[RedisClient] = F.delay {
val jClient: JRedisClient = JRedisClient.create(uri.underlying)
jClient.setOptions(opts)
new RedisClient(jClient, uri) {}
}

Expand All @@ -43,13 +45,21 @@ object RedisClient {
}

private[redis4cats] def acquireAndReleaseWithoutUri[F[_]: Concurrent: ContextShift: Log](
opts: ClientOptions,
blocker: Blocker
): F[(F[RedisClient], RedisClient => F[Unit])] =
F.delay(RedisURI.fromUnderlying(new JRedisURI())).map(uri => acquireAndRelease(uri, blocker))
F.delay(RedisURI.fromUnderlying(new JRedisURI()))
.map(uri => acquireAndRelease(uri, opts, blocker))

def apply[F[_]: Concurrent: ContextShift: Log](uri: => RedisURI): Resource[F, RedisClient] =
Resource.liftF(F.delay(ClientOptions.create())).flatMap(apply[F](uri, _))

def apply[F[_]: Concurrent: ContextShift: Log](
uri: => RedisURI,
opts: ClientOptions
): Resource[F, RedisClient] =
mkBlocker[F].flatMap { blocker =>
val (acquire, release) = acquireAndRelease(uri, blocker)
val (acquire, release) = acquireAndRelease(uri, opts, blocker)
Resource.make(acquire)(release)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import dev.profunktor.redis4cats.data._
import dev.profunktor.redis4cats.effect.{ JRFuture, Log }
import dev.profunktor.redis4cats.effect.JRFuture._
import io.lettuce.core.masterreplica.{ MasterReplica, StatefulRedisMasterReplicaConnection }
import io.lettuce.core.{ ReadFrom => JReadFrom }
import io.lettuce.core.{ ClientOptions, ReadFrom => JReadFrom }

/**
* It encapsulates an underlying `MasterReplica` connection
Expand Down Expand Up @@ -66,7 +66,8 @@ object RedisMasterReplica {
/**
* Creates a [[RedisMasterReplica]]
*
* It will also create an underlying [[RedisClient]] to establish connection with Redis
* It will also create an underlying [[RedisClient]] with default client options to
* establish connection with Redis.
*
* Example:
*
Expand All @@ -80,16 +81,41 @@ object RedisMasterReplica {
def make[K, V](
codec: RedisCodec[K, V],
uris: RedisURI*
)(readFrom: Option[JReadFrom] = None): Resource[F, RedisMasterReplica[K, V]] =
Resource.liftF(F.delay(ClientOptions.create())).flatMap(withOptions(codec, _, uris: _*)(readFrom))

/**
* Creates a [[RedisMasterReplica]] using the supplied client options
*
* It will also create an underlying [[RedisClient]] using the supplied client options
* to establish connection with Redis.
*
* Example:
*
* {{{
* val conn: Resource[IO, RedisMasterReplica[String, String]] =
* for {
* ops <- Resource.liftF(F.delay(ClientOptions.create()))
* uri <- Resource.liftF(RedisURI.make[IO](redisURI))
* mrc <- RedisMasterReplica[IO].withOptions(RedisCodec.Utf8, ops, uri)(Some(ReadFrom.MasterPreferred))
* } yield mrc
* }}}
*/
def withOptions[K, V](
codec: RedisCodec[K, V],
opts: ClientOptions,
uris: RedisURI*
)(readFrom: Option[JReadFrom] = None): Resource[F, RedisMasterReplica[K, V]] =
mkBlocker[F].flatMap { blocker =>
Resource.liftF(RedisClient.acquireAndReleaseWithoutUri[F](blocker)).flatMap {
Resource.liftF(RedisClient.acquireAndReleaseWithoutUri[F](opts, blocker)).flatMap {
case (acquireClient, releaseClient) =>
Resource.make(acquireClient)(releaseClient).flatMap { client =>
val (acquire, release) = acquireAndRelease(client, codec, readFrom, blocker, uris: _*)
Resource.make(acquire)(release)
}
}
}

}

def apply[F[_]: Concurrent: ContextShift: Log]: MasterReplicaPartiallyApplied[F] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands
import io.lettuce.core.cluster.api.sync.{ RedisClusterCommands => RedisClusterSyncCommands }
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
import io.lettuce.core.ClientOptions

object Redis {

Expand Down Expand Up @@ -109,7 +110,7 @@ object Redis {
/**
* Creates a [[RedisCommands]] for a single-node connection.
*
* It will create an underlying RedisClient to establish a
* It will create an underlying RedisClient with default options to establish
* connection with Redis.
*
* Example:
Expand All @@ -128,11 +129,40 @@ object Redis {
redis <- this.fromClient(client, codec)
} yield redis

/**
* Creates a [[RedisCommands]] for a single-node connection.
*
* It will create an underlying RedisClient using the supplied client options
* to establish connection with Redis.
*
* Example:
*
* {{{
* for {
* opts <- Resource.liftF(F.delay(ClientOptions.create())) // configure timeouts, etc
* cmds <- Redis[IO].withOptions("redis://localhost", opts, RedisCodec.Ascii)
* } yield cmds
* }}}
*
* Note: if you need to create multiple connections, use [[fromClient]]
* instead, which allows you to re-use the same client.
*/
def withOptions[K, V](
uri: String,
opts: ClientOptions,
codec: RedisCodec[K, V]
): Resource[F, RedisCommands[F, K, V]] =
for {
redisUri <- Resource.liftF(RedisURI.make[F](uri))
client <- RedisClient[F](redisUri, opts)
redis <- this.fromClient(client, codec)
} yield redis

/**
* Creates a [[RedisCommands]] for a single-node connection to deal
* with UTF-8 encoded keys and values.
*
* It will also create an underlying RedisClient to establish a
* It will create an underlying RedisClient with default options to establish
* connection with Redis.
*
* Example:
Expand Down Expand Up @@ -176,7 +206,7 @@ object Redis {
/**
* Creates a [[RedisCommands]] for a cluster connection.
*
* It will also create an underlying RedisClusterClient to establish a
* It will also create an underlying RedisClusterClient to establish
* connection with Redis.
*
* Example:
Expand Down Expand Up @@ -206,7 +236,7 @@ object Redis {
* Creates a [[RedisCommands]] for a cluster connection to deal
* with UTF-8 encoded keys and values.
*
* It will also create an underlying RedisClusterClient to establish a
* It will also create an underlying RedisClusterClient to establish
* connection with Redis.
*
* Example:
Expand Down
39 changes: 39 additions & 0 deletions site/docs/effects/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,36 @@ The only difference with other APIs will be the `Commands` type. For the `String

Doing it this way, you can share the same `RedisClient` to establish many different connections. If your use case is simple, have a look at the section below.

#### Client configuration

When you create a `RedisClient`, it will use sane defaults for timeouts, auto-reconnection, etc. These defaults can be customized by providing a `io.lettuce.core.ClientOptions` as well as the `RedisURI`.

```scala mdoc:silent
import io.lettuce.core.{ ClientOptions, TimeoutOptions }
import java.time.Duration

val mkOpts: IO[ClientOptions] =
IO {
ClientOptions.builder()
.autoReconnect(false)
.pingBeforeActivateConnection(false)
.timeoutOptions(
TimeoutOptions.builder()
.fixedTimeout(Duration.ofSeconds(10))
.build()
)
.build()
}

val api: Resource[IO, StringCommands[IO, String, String]] =
for {
uri <- Resource.liftF(RedisURI.make[IO]("redis://localhost"))
opts <- Resource.liftF(mkOpts)
client <- RedisClient[IO](uri, opts)
redis <- Redis[IO].fromClient(client, stringCodec)
} yield redis
```

### Single node connection

For those who only need a simple API access to Redis commands, there are a few ways to acquire a connection:
Expand All @@ -89,6 +119,15 @@ val simpleApi: Resource[IO, StringCommands[IO, String, String]] =
Redis[IO].simple("redis://localhost", RedisCodec.Ascii)
```

A simple connection with custom client options:

```scala mdoc:silent
val simpleOptsApi: Resource[IO, StringCommands[IO, String, String]] =
Resource.liftF(IO(ClientOptions.create())).flatMap { opts =>
Redis[IO].withOptions("redis://localhost", opts, RedisCodec.Ascii)
}
```

Or the most common one:

```scala mdoc:silent
Expand Down

0 comments on commit 3c2679a

Please sign in to comment.