Skip to content

Commit

Permalink
Merge pull request #15 from gvolpe/upgrade/cats-effect-1.0.0
Browse files Browse the repository at this point in the history
Upgrade to cats-effect 1.0.0
  • Loading branch information
gvolpe authored Sep 8, 2018
2 parents 9c0d7e4 + 31e23f1 commit 04e8b84
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 14 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name := """fs2-redis-root"""

organization in ThisBuild := "com.github.gvolpe"

version in ThisBuild := "0.2.0"
version in ThisBuild := "0.3.0"

crossScalaVersions in ThisBuild := Seq("2.12.6")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.github.gvolpe.fs2redis.algebra.{PubSubCommands, PublishCommands, Subs
import com.github.gvolpe.fs2redis.model._
import com.github.gvolpe.fs2redis.util.{JRFuture, Log}
import fs2.Stream
import fs2.async.mutable
import fs2.concurrent.Topic
import io.lettuce.core.RedisURI
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection

Expand Down Expand Up @@ -58,7 +58,7 @@ object Fs2PubSub {
val (acquire, release) = acquireAndRelease[F, K, V](client, codec, uri)
// One exclusive connection for subscriptions and another connection for publishing / stats
for {
state <- Stream.eval(Ref.of(Map.empty[K, mutable.Topic[F, Option[V]]]))
state <- Stream.eval(Ref.of(Map.empty[K, Topic[F, Option[V]]]))
sConn <- Stream.bracket(acquire)(release)
pConn <- Stream.bracket(acquire)(release)
subs <- Stream.emit(new Fs2PubSubCommands[F, K, V](state, sConn, pConn))
Expand Down Expand Up @@ -91,7 +91,7 @@ object Fs2PubSub {
val (acquire, release) = acquireAndRelease[F, K, V](client, codec, uri)

for {
state <- Stream.eval(Ref.of(Map.empty[K, mutable.Topic[F, Option[V]]]))
state <- Stream.eval(Ref.of(Map.empty[K, Topic[F, Option[V]]]))
sConn <- Stream.bracket(acquire)(release)
} yield new Fs2Subscriber[F, K, V](state, sConn)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.github.gvolpe.fs2redis.algebra.{PubSubCommands, PubSubStats, Subscrib
import com.github.gvolpe.fs2redis.interpreter.pubsub.internals.{Fs2PubSubInternals, PubSubState}
import com.github.gvolpe.fs2redis.model
import com.github.gvolpe.fs2redis.model.{Fs2RedisChannel, Subscription}
import com.github.gvolpe.fs2redis.util.{JRFuture, Log}
import com.github.gvolpe.fs2redis.util.JRFuture
import fs2.Stream
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.github.gvolpe.fs2redis.interpreter.pubsub.internals.{Fs2PubSubInterna
import com.github.gvolpe.fs2redis.model.Fs2RedisChannel
import com.github.gvolpe.fs2redis.util.JRFuture
import fs2.Stream
import fs2.async.mutable.Topic
import fs2.concurrent.Topic
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection

class Fs2Subscriber[F[_], K, V](state: Ref[F, PubSubState[F, K, V]], subConnection: StatefulRedisPubSubConnection[K, V])(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import cats.effect.syntax.effect._
import cats.syntax.all._
import com.github.gvolpe.fs2redis.model.Fs2RedisChannel
import com.github.gvolpe.fs2redis.util.Log
import fs2.async.mutable
import fs2.concurrent.Topic
import io.lettuce.core.pubsub.{RedisPubSubListener, StatefulRedisPubSubConnection}

object Fs2PubSubInternals {

private[fs2redis] def defaultListener[F[_]: ConcurrentEffect, K, V](
fs2RedisChannel: Fs2RedisChannel[K],
topic: mutable.Topic[F, Option[V]]): RedisPubSubListener[K, V] =
topic: Topic[F, Option[V]]): RedisPubSubListener[K, V] =
new RedisPubSubListener[K, V] {
override def message(channel: K, message: V): Unit =
if (channel == fs2RedisChannel.value) {
Expand All @@ -49,7 +49,7 @@ object Fs2PubSubInternals {
st.get(channel.value)
.fold {
for {
topic <- fs2.async.topic[F, Option[V]](None)
topic <- Topic[F, Option[V]](None)
listener = defaultListener(channel, topic)
_ <- L.info(s"Creating listener for channel: $channel")
_ <- F.delay(subConnection.addListener(listener))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
package com.github.gvolpe.fs2redis.interpreter.pubsub

import com.github.gvolpe.fs2redis.model.Fs2RedisChannel
import fs2.async.mutable
import fs2.async.mutable.Topic
import fs2.concurrent.Topic

package object internals {
private[pubsub] type PubSubState[F[_], K, V] = Map[K, mutable.Topic[F, Option[V]]]
private[pubsub] type PubSubState[F[_], K, V] = Map[K, Topic[F, Option[V]]]
private[pubsub] type GetOrCreateTopicListener[F[_], K, V] =
Fs2RedisChannel[K] => PubSubState[F, K, V] => F[Topic[F, Option[V]]]
}
4 changes: 2 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import sbt._
object Dependencies {

object Versions {
val catsEffect = "1.0.0-RC3"
val fs2 = "1.0.0-M4"
val catsEffect = "1.0.0"
val fs2 = "1.0.0-M5"
val lettuce = "5.1.0.M1"
val scribe = "2.6.0"

Expand Down

0 comments on commit 04e8b84

Please sign in to comment.