Skip to content

Commit

Permalink
Merge pull request #92 from profunktor/refactor/rm-java-stuff
Browse files Browse the repository at this point in the history
[refactor] avoid exposing Java stuff to the user like codecs, uri, readfrom
  • Loading branch information
gvolpe authored May 14, 2019
2 parents cfa9aed + 26a8bed commit 5ed302d
Show file tree
Hide file tree
Showing 69 changed files with 94 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package dev.profunktor.redis4cats

import io.lettuce.core.{ RedisClient => JRedisClient }
import io.lettuce.core.{ RedisClient => JRedisClient, ReadFrom => JReadFrom }
import io.lettuce.core.cluster.{ RedisClusterClient => JClusterClient }
import io.lettuce.core.codec.{ RedisCodec => JRedisCodec, ToByteBufEncoder }
import io.lettuce.core.codec.{ RedisCodec => JRedisCodec, StringCodec, ToByteBufEncoder }
import io.lettuce.core.masterslave.StatefulRedisMasterSlaveConnection

object domain {
Expand All @@ -40,9 +40,9 @@ object domain {
extends RedisMasterSlaveConnection[K, V]

trait RedisChannel[K] {
def value: K
def underlying: K
}
case class LiveChannel[K](value: K) extends RedisChannel[K]
case class LiveChannel[K](underlying: K) extends RedisChannel[K]

type JCodec[K, V] = JRedisCodec[K, V] with ToByteBufEncoder[K, V]

Expand All @@ -51,4 +51,17 @@ object domain {
}
case class LiveRedisCodec[K, V](underlying: JCodec[K, V]) extends RedisCodec[K, V]

object RedisCodec {
val Ascii = LiveRedisCodec(StringCodec.ASCII)
val Utf8 = LiveRedisCodec(StringCodec.UTF8)
}

object ReadFrom {
val Master = JReadFrom.MASTER
val MasterPreferred = JReadFrom.MASTER_PREFERRED
val Nearest = JReadFrom.NEAREST
val Slave = JReadFrom.SLAVE
val SlavePreferred = JReadFrom.SLAVE_PREFERRED
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/

package dev.profunktor.redis4cats.interpreter
import java.util.concurrent.TimeUnit

import cats.effect._
import cats.implicits._
Expand All @@ -25,13 +24,10 @@ import dev.profunktor.redis4cats.domain._
import dev.profunktor.redis4cats.effect.{ JRFuture, Log }
import dev.profunktor.redis4cats.effects._
import io.lettuce.core.{ Limit => JLimit, Range => JRange, RedisURI => JRedisURI }
import io.lettuce.core.{ GeoRadiusStoreArgs, GeoWithin, ScoredValue }
import io.lettuce.core.{ GeoArgs, GeoRadiusStoreArgs, GeoWithin, ScoredValue, ZAddArgs, ZStoreArgs }
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration
import io.lettuce.core.GeoArgs
import io.lettuce.core.ZAddArgs
import io.lettuce.core.ZStoreArgs

object Redis {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import dev.profunktor.redis4cats.effect.Log

object transactions {

case class RedisTransaction[F[_]: Log: Sync, K, V, A](
case class RedisTransaction[F[_]: Log: Sync, K, V](
cmd: RedisCommands[F, K, V]
) {
def run(fa: F[A]): F[A] =
def run[A](fa: F[A]): F[A] =
Log[F].info("Transaction started") *>
cmd.multi.bracketCase(_ => fa) {
case (_, ExitCase.Completed) => cmd.exec *> Log[F].info("Transaction completed")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package dev.profunktor.redis4cats
import cats.effect.IO
import dev.profunktor.redis4cats.codecs.Codecs
import dev.profunktor.redis4cats.codecs.splits._
import dev.profunktor.redis4cats.domain.{ LiveRedisCodec, RedisCodec }
import io.lettuce.core.codec.StringCodec
import dev.profunktor.redis4cats.domain.RedisCodec

object Demo {

val redisURI: String = "redis://localhost"
val redisClusterURI: String = "redis://localhost:30001"
val stringCodec: RedisCodec[String, String] = LiveRedisCodec(StringCodec.UTF8)
val stringCodec: RedisCodec[String, String] = RedisCodec.Utf8
val longCodec: RedisCodec[String, Long] = Codecs.derive[String, Long](stringCodec, stringLongEpi)

def putStrLn[A](a: A): IO[Unit] = IO(println(a))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ package dev.profunktor.redis4cats

import cats.effect.{ IO, Resource }
import dev.profunktor.redis4cats.connection._
import dev.profunktor.redis4cats.domain.RedisMasterSlaveConnection
import dev.profunktor.redis4cats.domain.{ ReadFrom, RedisMasterSlaveConnection }
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.interpreter.Redis
import io.lettuce.core.{ ReadFrom => JReadFrom }

object Fs2RedisMasterSlaveStringsDemo extends LoggerIOApp {

Expand All @@ -35,7 +34,7 @@ object Fs2RedisMasterSlaveStringsDemo extends LoggerIOApp {

val connection: Resource[IO, RedisMasterSlaveConnection[String, String]] =
Resource.liftF(RedisURI.make[IO](redisURI)).flatMap { uri =>
RedisMasterSlave[IO, String, String](stringCodec, uri)(Some(JReadFrom.MASTER_PREFERRED))
RedisMasterSlave[IO, String, String](stringCodec, uri)(Some(ReadFrom.MasterPreferred))
}

connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import dev.profunktor.redis4cats.algebra.RedisCommands
import dev.profunktor.redis4cats.connection._
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.interpreter.Redis
import dev.profunktor.redis4cats.transactions._

object Fs2RedisTransactionsDemo extends LoggerIOApp {
object RedisTransactionsDemo extends LoggerIOApp {

import Demo._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,10 @@ class LivePubSubCommands[F[_]: ConcurrentEffect: ContextShift: Log, K, V](

override def publish(channel: RedisChannel[K]): Stream[F, V] => Stream[F, Unit] =
_.evalMap { message =>
val getOrCreateTopicListener = PubSubInternals[F, K, V](state, subConnection)
for {
st <- state.get
_ <- getOrCreateTopicListener(channel)(st)
_ <- JRFuture { Sync[F].delay(pubConnection.async().publish(channel.value, message)) }
} yield ()
state.get.flatMap { st =>
PubSubInternals[F, K, V](state, subConnection).apply(channel)(st) *>
JRFuture { Sync[F].delay(pubConnection.async().publish(channel.underlying, message)) }
}.void
}

override def pubSubChannels: Stream[F, List[K]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
package dev.profunktor.redis4cats.interpreter.pubsub

import cats.effect.{ Concurrent, ContextShift, Sync }
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.all._
import dev.profunktor.redis4cats.algebra.PubSubStats
import dev.profunktor.redis4cats.domain._
import dev.profunktor.redis4cats.streams.Subscription
Expand All @@ -44,10 +43,9 @@ class LivePubSubStats[F[_]: Concurrent: ContextShift, K, V](

override def pubSubSubscriptions(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] =
Stream.eval {
for {
kv <- JRFuture(Sync[F].delay(pubConnection.async().pubsubNumsub(channels.map(_.value): _*)))
rs <- Sync[F].delay(kv.asScala.toList.map { case (k, n) => Subscription(LiveChannel[K](k), n) })
} yield rs
JRFuture(Sync[F].delay(pubConnection.async().pubsubNumsub(channels.map(_.underlying): _*))).flatMap { kv =>
Sync[F].delay(kv.asScala.toList.map { case (k, n) => Subscription(LiveChannel[K](k), n) })
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,9 @@ object PubSub {
uri: JRedisURI
): Stream[F, SubscribeCommands[Stream[F, ?], K, V]] = {
val (acquire, release) = acquireAndRelease[F, K, V](client, codec, uri)

for {
state <- Stream.eval(Ref.of(Map.empty[K, Topic[F, Option[V]]]))
sConn <- Stream.bracket(acquire)(release)
} yield new Subscriber[F, K, V](state, sConn)
Stream.eval(Ref.of(Map.empty[K, Topic[F, Option[V]]])).flatMap { st =>
Stream.bracket(acquire)(release).map(new Subscriber(st, _))
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Publisher[F[_]: ConcurrentEffect: ContextShift, K, V](pubConnection: State

override def publish(channel: RedisChannel[K]): Stream[F, V] => Stream[F, Unit] =
_.evalMap { message =>
JRFuture { Sync[F].delay(pubConnection.async().publish(channel.value, message)) }.void
JRFuture { Sync[F].delay(pubConnection.async().publish(channel.underlying, message)) }.void
}

override def pubSubChannels: Stream[F, List[K]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,26 @@ import dev.profunktor.redis4cats.interpreter.pubsub.internals.{ PubSubInternals,
import dev.profunktor.redis4cats.domain.RedisChannel
import dev.profunktor.redis4cats.effect.{ JRFuture, Log }
import fs2.Stream
import fs2.concurrent.Topic
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection

class Subscriber[F[_]: ConcurrentEffect: ContextShift: Log, K, V](
state: Ref[F, PubSubState[F, K, V]],
subConnection: StatefulRedisPubSubConnection[K, V]
) extends SubscribeCommands[Stream[F, ?], K, V] {

override def subscribe(channel: RedisChannel[K]): Stream[F, V] = {
val getOrCreateTopicListener = PubSubInternals[F, K, V](state, subConnection)
val setup: F[Topic[F, Option[V]]] =
for {
st <- state.get
topic <- getOrCreateTopicListener(channel)(st)
_ <- JRFuture(Sync[F].delay(subConnection.async().subscribe(channel.value)))
} yield topic

Stream.eval(setup).flatMap(_.subscribe(500).unNone)
}
override def subscribe(channel: RedisChannel[K]): Stream[F, V] =
Stream
.eval(
state.get.flatMap { st =>
PubSubInternals[F, K, V](state, subConnection).apply(channel)(st) <*
JRFuture(Sync[F].delay(subConnection.async().subscribe(channel.underlying)))
}
)
.flatMap(_.subscribe(500).unNone)

override def unsubscribe(channel: RedisChannel[K]): Stream[F, Unit] =
Stream.eval {
JRFuture(Sync[F].delay(subConnection.async().unsubscribe(channel.value))).void
JRFuture(Sync[F].delay(subConnection.async().unsubscribe(channel.underlying))).void
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object PubSubInternals {
): RedisPubSubListener[K, V] =
new RedisPubSubListener[K, V] {
override def message(ch: K, msg: V): Unit =
if (ch == channel.value) {
if (ch == channel.underlying) {
topic.publish1(Option(msg)).toIO.unsafeRunAsync(_ => ())
}
override def message(pattern: K, channel: K, message: V): Unit = this.message(channel, message)
Expand All @@ -47,15 +47,14 @@ object PubSubInternals {
state: Ref[F, PubSubState[F, K, V]],
subConnection: StatefulRedisPubSubConnection[K, V]
)(implicit F: ConcurrentEffect[F], L: Log[F]): GetOrCreateTopicListener[F, K, V] = { channel => st =>
st.get(channel.value)
st.get(channel.underlying)
.fold {
for {
topic <- Topic[F, Option[V]](None)
listener = defaultListener(channel, topic)
_ <- L.info(s"Creating listener for channel: $channel")
_ <- F.delay(subConnection.addListener(listener))
_ <- state.update(_.updated(channel.value, topic))
} yield topic
Topic[F, Option[V]](None).flatTap { topic =>
val listener = defaultListener(channel, topic)
L.info(s"Creating listener for channel: $channel") *>
F.delay(subConnection.addListener(listener)) *>
state.update(_.updated(channel.underlying, topic))
}
}(F.pure)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private[streams] class RedisRawStreaming[F[_]: Concurrent: ContextShift, K, V](
override def xAdd(key: K, body: Map[K, V]): F[MessageId] =
JRFuture {
Sync[F].delay(client.async().xadd(key, body.asJava))
}.map(MessageId.apply)
}.map(MessageId)

override def xRead(streams: Set[StreamingOffset[K]]): F[List[StreamingMessageWithId[K, V]]] = {
val offsets = streams.map(s => StreamOffset.from(s.key, s.offset)).toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object RedisStream {
.fromConnectionFuture {
Sync[F].delay(client.underlying.connectAsync[K, V](codec.underlying, uri))
}
.map(c => new RedisRawStreaming(c))
.map(new RedisRawStreaming(_))

val release: RedisRawStreaming[F, K, V] => F[Unit] = c =>
JRFuture.fromCompletableFuture(Sync[F].delay(c.client.closeAsync())) *>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@

package dev.profunktor.redis4cats

import cats.effect.{ Clock, ContextShift, IO, Timer }
import cats.effect._
import cats.syntax.apply._
import cats.syntax.functor._
import dev.profunktor.redis4cats.algebra._
import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.domain.{ LiveRedisCodec, RedisCodec }
import dev.profunktor.redis4cats.connection.{ RedisClient, RedisURI }
import dev.profunktor.redis4cats.domain.RedisCodec
import dev.profunktor.redis4cats.interpreter.Redis
import io.lettuce.core.{ RedisURI => JRedisURI }
import io.lettuce.core.codec.{ StringCodec => JStringCodec }
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach, Suite }

import scala.concurrent.{ ExecutionContext, SyncVar }
import scala.sys.process.{ Process, ProcessLogger }
import scala.util.Random
Expand All @@ -44,8 +41,6 @@ trait DockerRedis extends BeforeAndAfterAll with BeforeAndAfterEach { self: Suit

lazy val redisPort: Int = 6379

lazy val redisUri: JRedisURI = JRedisURI.create("redis://localhost")

private var dockerInstanceId: Option[String] = None

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
Expand All @@ -71,13 +66,14 @@ trait DockerRedis extends BeforeAndAfterAll with BeforeAndAfterEach { self: Suit
dockerInstanceId.foreach(stopRedis(_, clearContainers))
}

private val stringCodec = LiveRedisCodec(JStringCodec.UTF8)
private val stringCodec = RedisCodec.Utf8

private def mkRedis[K, V](codec: RedisCodec[K, V]) =
RedisClient[IO](redisUri)
.flatMap { client =>
Redis[IO, K, V](client, codec, redisUri)
}
for {
uri <- Resource.liftF(RedisURI.make[IO]("redis://localhost"))
client <- RedisClient[IO](uri)
redis <- Redis[IO, K, V](client, codec, uri)
} yield redis

def withAbstractRedis[A, K, V](f: RedisCommands[IO, K, V] => IO[A])(codec: RedisCodec[K, V]): Unit =
mkRedis(codec).use(f).void.unsafeRunSync()
Expand Down
Loading

0 comments on commit 5ed302d

Please sign in to comment.