From fd5f543528c734d33c5f883be601e62de3abcd7f Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Sat, 1 Oct 2022 14:37:45 +0200 Subject: [PATCH] Upgrade to ZIO 2 (#453) * Upgrade to ZIO 2 * Fix 2.12 * Fix 2.12 --- .github/workflows/ci.yml | 2 +- actors/src/main/scala/zio/actors/Actor.scala | 13 +- .../src/main/scala/zio/actors/ActorRef.scala | 44 ++-- .../main/scala/zio/actors/ActorSystem.scala | 190 ++++++++++-------- .../main/scala/zio/actors/ActorsConfig.scala | 15 +- .../main/scala/zio/actors/Supervisor.scala | 9 +- .../test/scala/zio/actors/ActorsSpec.scala | 64 +++--- .../test/scala/zio/actors/RemoteSpec.scala | 61 +++--- .../zio/actors/akka/AkkaTypedActor.scala | 4 +- .../actors/akka/AkkaTypedActorRefLocal.scala | 4 +- .../zio/actors/akka/ActorsAkkaSpec.scala | 60 +++--- build.sbt | 10 +- .../examples/persistence/ShoppingCart.scala | 54 +++-- .../persistence/ShoppingCartSpec.scala | 72 +++---- .../actors/persistence/jdbc/JDBCConfig.scala | 6 +- .../actors/persistence/jdbc/JDBCJournal.scala | 24 ++- .../zio/actors/persistence/EventSource.scala | 41 ++-- .../persistence/PersistenceConfig.scala | 24 ++- .../persistence/journal/InMemJournal.scala | 9 +- .../actors/persistence/PersistenceSpec.scala | 30 ++- project/BuildHelper.scala | 9 +- 21 files changed, 391 insertions(+), 354 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 72117d9b..ee7d3161 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,7 +15,7 @@ jobs: fail-fast: false matrix: java: ['adopt@1.8', 'adopt@1.11'] - scala: ['2.12.15', '2.13.3'] + scala: ['2.12.15', '2.13.8'] steps: - uses: actions/checkout@v2.3.4 - uses: olafurpg/setup-scala@v10 diff --git a/actors/src/main/scala/zio/actors/Actor.scala b/actors/src/main/scala/zio/actors/Actor.scala index e2a5d70f..1fa06631 100644 --- a/actors/src/main/scala/zio/actors/Actor.scala +++ b/actors/src/main/scala/zio/actors/Actor.scala @@ -1,7 +1,6 @@ package zio.actors import zio.actors.Actor.PendingMessage -import zio.clock.Clock import zio.{ Supervisor => _, _ } object Actor { @@ -35,19 +34,19 @@ object Actor { context: Context, optOutActorSystem: () => Task[Unit], mailboxSize: Int = DefaultActorMailboxSize - )(initial: S): URIO[R with Clock, Actor[F]] = { + )(initial: S): URIO[R, Actor[F]] = { - def process[A](msg: PendingMessage[F, A], state: Ref[S]): URIO[R with Clock, Unit] = + def process[A](msg: PendingMessage[F, A], state: Ref[S]): URIO[R, Unit] = for { s <- state.get (fa, promise) = msg receiver = receive(s, fa, context) completer = ((s: S, a: A) => state.set(s) *> promise.succeed(a)).tupled - _ <- receiver.foldM( + _ <- receiver.foldZIO( e => supervisor .supervise(receiver, e) - .foldM(_ => promise.fail(e), completer), + .foldZIO(_ => promise.fail(e), completer), completer ) } yield () @@ -70,7 +69,7 @@ object Actor { context: Context, optOutActorSystem: () => Task[Unit], mailboxSize: Int = DefaultActorMailboxSize - )(initial: S): RIO[R with Clock, Actor[F]] + )(initial: S): RIO[R, Actor[F]] } @@ -104,7 +103,7 @@ private[actors] final class Actor[-F[+_]]( this.stop } - val stop: Task[List[_]] = + val stop: Task[Chunk[_]] = for { tall <- queue.takeAll _ <- queue.shutdown diff --git a/actors/src/main/scala/zio/actors/ActorRef.scala b/actors/src/main/scala/zio/actors/ActorRef.scala index f5288642..f386c82d 100644 --- a/actors/src/main/scala/zio/actors/ActorRef.scala +++ b/actors/src/main/scala/zio/actors/ActorRef.scala @@ -1,10 +1,10 @@ package zio.actors -import java.io.{ IOException, ObjectInputStream, ObjectOutputStream, ObjectStreamException } +import zio.nio.channels.AsynchronousSocketChannel +import zio.nio.{ InetAddress, InetSocketAddress } +import zio.{ Chunk, Runtime, Task, UIO, Unsafe, ZIO } -import zio.nio.core.{ InetAddress, InetSocketAddress, SocketAddress } -import zio.nio.core.channels.AsynchronousSocketChannel -import zio.{ IO, Runtime, Task, UIO } +import java.io.{ IOException, ObjectInputStream, ObjectOutputStream, ObjectStreamException } /** * Reference to actor that might reside on local JVM instance or be available via remote communication @@ -41,7 +41,7 @@ sealed trait ActorRef[-F[+_]] extends Serializable { /** * Stops actor and all its children */ - val stop: Task[List[_]] + val stop: Task[Chunk[_]] } @@ -73,13 +73,15 @@ private[actors] sealed abstract class ActorRefSerial[-F[+_]](private var actorPa (_, addr, port, _) = resolved address <- InetAddress .byName(addr.value) - .flatMap(iAddr => SocketAddress.inetSocketAddress(iAddr, port.value)) + .flatMap(iAddr => InetSocketAddress.inetAddress(iAddr, port.value)) } yield new ActorRefRemote[F](actorPath, address) - ActorRefSerial.runtimeForResolve.unsafeRun(remoteRef) + Unsafe.unsafe { implicit u => + ActorRefSerial.runtimeForResolve.unsafe.run(remoteRef).getOrThrowFiberFailure() + } } - override val path: UIO[String] = UIO(actorPath) + override val path: UIO[String] = ZIO.succeed(actorPath) } private[actors] final class ActorRefLocal[-F[+_]]( @@ -90,7 +92,7 @@ private[actors] final class ActorRefLocal[-F[+_]]( override def !(fa: F[_]): Task[Unit] = actor ! fa - override val stop: Task[List[_]] = actor.stop + override val stop: Task[Chunk[_]] = actor.stop @throws[IOException] private def writeObject(out: ObjectOutputStream): Unit = @@ -115,19 +117,21 @@ private[actors] final class ActorRefRemote[-F[+_]]( override def !(fa: F[_]): Task[Unit] = sendEnvelope[Unit](Command.Tell(fa)) - override val stop: Task[List[_]] = sendEnvelope(Command.Stop) + override val stop: Task[Chunk[_]] = sendEnvelope(Command.Stop) private def sendEnvelope[A](command: Command): Task[A] = - for { - client <- AsynchronousSocketChannel() - response <- for { - _ <- client.connect(address) - actorPath <- path - _ <- writeToWire(client, new Envelope(command, actorPath)) - response <- readFromWire(client) - } yield response.asInstanceOf[Either[Throwable, A]] - result <- IO.fromEither(response) - } yield result + ZIO.scoped { + for { + client <- AsynchronousSocketChannel.open + response <- for { + _ <- client.connect(address) + actorPath <- path + _ <- writeToWire(client, new Envelope(command, actorPath)) + response <- readFromWire(client) + } yield response.asInstanceOf[Either[Throwable, A]] + result <- ZIO.fromEither(response) + } yield result + } @throws[IOException] private def writeObject(out: ObjectOutputStream): Unit = diff --git a/actors/src/main/scala/zio/actors/ActorSystem.scala b/actors/src/main/scala/zio/actors/ActorSystem.scala index ab218bb0..4de571fe 100644 --- a/actors/src/main/scala/zio/actors/ActorSystem.scala +++ b/actors/src/main/scala/zio/actors/ActorSystem.scala @@ -1,16 +1,14 @@ package zio.actors -import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, File, ObjectInputStream, ObjectOutputStream } -import java.nio.ByteBuffer - -import zio.{ Chunk, IO, Promise, RIO, Ref, Task, UIO, ZIO } import zio.actors.Actor.{ AbstractStateful, Stateful } import zio.actors.ActorSystemUtils._ import zio.actors.ActorsConfig._ -import zio.clock.Clock -import zio.nio.core.{ Buffer, InetAddress, SocketAddress } -import zio.nio.core.channels.{ AsynchronousServerSocketChannel, AsynchronousSocketChannel } +import zio.nio.channels.{ AsynchronousServerSocketChannel, AsynchronousSocketChannel } +import zio.nio.{ Buffer, InetAddress, InetSocketAddress } +import zio.{ Chunk, Promise, RIO, Ref, Task, ZIO } +import java.io._ +import java.nio.ByteBuffer import scala.io.Source /** @@ -32,10 +30,10 @@ object ActorSystem { initActorRefMap <- Ref.make(Map.empty[String, Any]) config <- retrieveConfig(configFile) remoteConfig <- retrieveRemoteConfig(sysName, config) - actorSystem <- IO.effect(new ActorSystem(sysName, config, remoteConfig, initActorRefMap, parentActor = None)) - _ <- IO - .effectTotal(remoteConfig) - .flatMap(_.fold[Task[Unit]](IO.unit)(c => actorSystem.receiveLoop(c.addr, c.port))) + actorSystem <- ZIO.attempt(new ActorSystem(sysName, config, remoteConfig, initActorRefMap, parentActor = None)) + _ <- ZIO + .succeed(remoteConfig) + .flatMap(_.fold[Task[Unit]](ZIO.unit)(c => actorSystem.receiveLoop(c.addr, c.port))) } yield actorSystem } @@ -71,7 +69,7 @@ final class Context private[actors] ( sup: Supervisor[R], init: S, stateful: Stateful[R, S, F1] - ): ZIO[R with Clock, Throwable, ActorRef[F1]] = + ): ZIO[R, Throwable, ActorRef[F1]] = for { actorRef <- actorSystem.make(actorName, sup, init, stateful) children <- childrenRef.get @@ -127,11 +125,11 @@ final class ActorSystem private[actors] ( sup: Supervisor[R], init: S, stateful: AbstractStateful[R, S, F] - ): RIO[R with Clock, ActorRef[F]] = + ): RIO[R, ActorRef[F]] = for { map <- refActorMap.get finalName <- buildFinalName(parentActor.getOrElse(""), actorName) - _ <- if (map.contains(finalName)) IO.fail(new Exception(s"Actor $finalName already exists")) else IO.unit + _ <- ZIO.fail(new Exception(s"Actor $finalName already exists")).when(map.contains(finalName)) path = buildPath(actorSystemName, finalName, remoteConfig) derivedSystem = new ActorSystem(actorSystemName, config, remoteConfig, refActorMap, Some(finalName)) childrenSet <- Ref.make(Set.empty[ActorRef[Any]]) @@ -165,17 +163,17 @@ final class ActorSystem private[actors] ( actorRef <- actorMap.get(actorName) match { case Some(value) => for { - actor <- IO.effectTotal(value.asInstanceOf[Actor[F]]) + actor <- ZIO.succeed(value.asInstanceOf[Actor[F]]) } yield new ActorRefLocal(path, actor) case None => - IO.fail(new Exception(s"No such actor $actorName in local ActorSystem.")) + ZIO.fail(new Exception(s"No such actor $actorName in local ActorSystem.")) } } yield actorRef else for { address <- InetAddress .byName(addr.value) - .flatMap(iAddr => SocketAddress.inetSocketAddress(iAddr, port.value)) + .flatMap(iAddr => InetSocketAddress.inetAddress(iAddr, port.value)) } yield new ActorRefRemote[F](path, address) } yield actorRef @@ -198,61 +196,66 @@ final class ActorSystem private[actors] ( (_, _, _, actorName) = solvedPath _ <- refActorMap.update(_ - actorName) children <- childrenRef.get - _ <- ZIO.foreach_(children)(_.stop) + _ <- ZIO.foreachDiscard(children)(_.stop) _ <- childrenRef.set(Set.empty) } yield () private def receiveLoop(address: ActorsConfig.Addr, port: ActorsConfig.Port): Task[Unit] = - for { - addr <- InetAddress.byName(address.value) - address <- SocketAddress.inetSocketAddress(addr, port.value) - p <- Promise.make[Nothing, Unit] - channel <- AsynchronousServerSocketChannel() - loopEffect = for { - _ <- channel.bind(address) - - _ <- p.succeed(()) - - loop = for { - worker <- channel.accept - obj <- readFromWire(worker) - envelope = obj.asInstanceOf[Envelope] - actorMap <- refActorMap.get - remoteActorPath <- resolvePath(envelope.recipient).map(_._4) - _ <- actorMap.get(remoteActorPath) match { - case Some(value) => - for { - actor <- - IO - .effect(value.asInstanceOf[Actor[Any]]) - .mapError(throwable => - new Exception(s"System internal exception - ${throwable.getMessage}") - ) - response <- actor.unsafeOp(envelope.command).either - _ <- response match { - case Right( - stream: zio.stream.ZStream[ - Any @unchecked, - Throwable @unchecked, - Any @unchecked - ] - ) => - stream.foreach(writeToWire(worker, _)) - case _ => writeToWire(worker, response) - } - } yield () - case None => - for { - responseError <- IO.fail(new Exception("No such remote actor")).either - _ <- writeToWire(worker, responseError) - } yield () - } - } yield () - _ <- loop.forever - } yield () - _ <- loopEffect.onTermination(_ => channel.close.catchAll(_ => ZIO.unit)).fork - _ <- p.await - } yield () + ZIO.scoped { + for { + addr <- InetAddress.byName(address.value) + address <- InetSocketAddress.inetAddress(addr, port.value) + p <- Promise.make[Nothing, Unit] + loopEffect = ZIO.scoped { + for { + _ <- ZIO.succeed(s"Binding $address") + channel <- AsynchronousServerSocketChannel.open + _ <- channel.bind(Some(address)) + + _ <- p.succeed(()) + + loop = for { + worker <- channel.accept + obj <- readFromWire(worker) + envelope = obj.asInstanceOf[Envelope] + actorMap <- refActorMap.get + remoteActorPath <- resolvePath(envelope.recipient).map(_._4) + _ <- actorMap.get(remoteActorPath) match { + case Some(value) => + for { + actor <- + ZIO + .attempt(value.asInstanceOf[Actor[Any]]) + .mapError(throwable => + new Exception(s"System internal exception - ${throwable.getMessage}") + ) + response <- actor.unsafeOp(envelope.command).either + _ <- response match { + case Right( + stream: zio.stream.ZStream[ + Any @unchecked, + Throwable @unchecked, + Any @unchecked + ] + ) => + stream.foreach(writeToWire(worker, _)) + case _ => writeToWire(worker, response) + } + } yield () + case None => + for { + responseError <- ZIO.fail(new Exception("No such remote actor")).either + _ <- writeToWire(worker, responseError) + } yield () + } + } yield () + _ <- loop.forever + } yield () + } + _ <- loopEffect.fork + _ <- p.await + } yield () + } } /* INTERNAL API */ @@ -271,9 +274,9 @@ private[actors] object ActorSystemUtils { val address = Addr(value.group(2)) val port = Port(value.group(3).toInt) val actorName = "/" + value.group(4) - IO.succeed((actorSystemName, address, port, actorName)) - case None => - IO.fail( + ZIO.succeed((actorSystemName, address, port, actorName)) + case _ => + ZIO.fail( new Exception( "Invalid path provided. The pattern is zio://YOUR_ACTOR_SYSTEM_NAME@ADDRES:PORT/RELATIVE_ACTOR_PATH" ) @@ -282,51 +285,64 @@ private[actors] object ActorSystemUtils { private[actors] def buildFinalName(parentActorName: String, actorName: String): Task[String] = actorName match { - case "" => IO.fail(new Exception("Actor actor must not be empty")) - case null => IO.fail(new Exception("Actor actor must not be null")) - case RegexName(_*) => UIO.effectTotal(parentActorName + "/" + actorName) - case _ => IO.fail(new Exception(s"Invalid actor name provided $actorName. Valid symbols are -_.*$$+:@&=,!~';")) + case "" => ZIO.fail(new Exception("Actor actor must not be empty")) + case null => ZIO.fail(new Exception("Actor actor must not be null")) + case RegexName(_*) => ZIO.succeed(parentActorName + "/" + actorName) + case _ => ZIO.fail(new Exception(s"Invalid actor name provided $actorName. Valid symbols are -_.*$$+:@&=,!~';")) } def buildPath(actorSystemName: String, actorPath: String, remoteConfig: Option[RemoteConfig]): String = s"zio://$actorSystemName@${remoteConfig.map(c => c.addr.value + ":" + c.port.value).getOrElse("0.0.0.0:0000")}$actorPath" def retrieveConfig(configFile: Option[File]): Task[Option[String]] = - configFile.fold[Task[Option[String]]](Task.none) { file => - IO(Source.fromFile(file)).toManaged(f => UIO(f.close())).use(s => IO.some(s.mkString)) + configFile.fold[Task[Option[String]]](ZIO.none) { file => + ZIO.scoped { + ZIO + .acquireRelease(ZIO.attempt(Source.fromFile(file)))(f => ZIO.succeed(f.close())) + .flatMap(s => ZIO.some(s.mkString)) + } } def retrieveRemoteConfig(sysName: String, configStr: Option[String]): Task[Option[RemoteConfig]] = - configStr.fold[Task[Option[RemoteConfig]]](Task.none)(file => ActorsConfig.getRemoteConfig(sysName, file)) + configStr.fold[Task[Option[RemoteConfig]]](ZIO.none)(file => ActorsConfig.getRemoteConfig(sysName, file)) def objFromByteArray(bytes: Array[Byte]): Task[Any] = - Task(new ObjectInputStream(new ByteArrayInputStream(bytes))).toManaged(s => UIO(s.close())).use { s => - Task(s.readObject()) + ZIO.scoped { + ZIO + .acquireRelease(ZIO.attempt(new ObjectInputStream(new ByteArrayInputStream(bytes))))(s => + ZIO.succeed(s.close()) + ) + .flatMap { s => + ZIO.attempt(s.readObject()) + } } def readFromWire(socket: AsynchronousSocketChannel): Task[Any] = for { - size <- socket.read(4) + size <- socket.readChunk(4) buffer <- Buffer.byte(size) intBuffer <- buffer.asIntBuffer toRead <- intBuffer.get(0) - content <- socket.read(toRead) + content <- socket.readChunk(toRead) bytes = content.toArray obj <- objFromByteArray(bytes) } yield obj def objToByteArray(obj: Any): Task[Array[Byte]] = for { - stream <- UIO(new ByteArrayOutputStream()) - bytes <- Task(new ObjectOutputStream(stream)).toManaged(s => UIO(s.close())).use { s => - Task(s.writeObject(obj)) *> UIO(stream.toByteArray) + stream <- ZIO.succeed(new ByteArrayOutputStream()) + bytes <- ZIO.scoped { + ZIO.acquireRelease(ZIO.attempt(new ObjectOutputStream(stream)))(s => ZIO.succeed(s.close())).flatMap { + s => + ZIO.attempt(s.writeObject(obj)) *> ZIO.succeed(stream.toByteArray) + } } } yield bytes def writeToWire(socket: AsynchronousSocketChannel, obj: Any): Task[Unit] = for { bytes <- objToByteArray(obj) - _ <- socket.write(Chunk.fromArray(ByteBuffer.allocate(4).putInt(bytes.size).array())) - _ <- socket.write(Chunk.fromArray(bytes)) + _ <- socket.writeChunk(Chunk.fromArray(ByteBuffer.allocate(4).putInt(bytes.length).array())) + _ <- socket.writeChunk(Chunk.fromArray(bytes)) } yield () } diff --git a/actors/src/main/scala/zio/actors/ActorsConfig.scala b/actors/src/main/scala/zio/actors/ActorsConfig.scala index 1ff47dc7..37cf3f35 100644 --- a/actors/src/main/scala/zio/actors/ActorsConfig.scala +++ b/actors/src/main/scala/zio/actors/ActorsConfig.scala @@ -1,10 +1,9 @@ package zio.actors -import zio.{ Task, ZIO } -import zio.config.{ ConfigDescriptor, ZConfig } +import zio.config.ConfigDescriptor import zio.config.ConfigDescriptor._ import zio.config.typesafe.TypesafeConfig -import zio.Tag +import zio.{ Tag, Task, ZIO } private[actors] object ActorsConfig { @@ -14,16 +13,14 @@ private[actors] object ActorsConfig { val remoteConfig: ConfigDescriptor[Option[RemoteConfig]] = nested("remoting") { - (string("hostname").xmap[Addr](Addr, _.value) |@| - int("port").xmap[Port](Port, _.value))(RemoteConfig.apply, RemoteConfig.unapply) + (string("hostname").transform[Addr](Addr, _.value) zip + int("port").transform[Port](Port, _.value)).to[RemoteConfig] }.optional private def selectiveSystemConfig[T](systemName: String, configT: ConfigDescriptor[T]) = nested(systemName) { nested("zio") { - nested("actors") { - configT - } + nested("actors")(configT) } } @@ -33,7 +30,7 @@ private[actors] object ActorsConfig { configDescriptor: ConfigDescriptor[T] )(implicit tag: Tag[T]): Task[T] = ZIO - .access[ZConfig[T]](_.get) + .service[T] .provideLayer(TypesafeConfig.fromHoconString[T](configStr, selectiveSystemConfig(systemName, configDescriptor))) def getRemoteConfig(systemName: String, configStr: String): Task[Option[RemoteConfig]] = diff --git a/actors/src/main/scala/zio/actors/Supervisor.scala b/actors/src/main/scala/zio/actors/Supervisor.scala index cd94ddef..9fb95192 100644 --- a/actors/src/main/scala/zio/actors/Supervisor.scala +++ b/actors/src/main/scala/zio/actors/Supervisor.scala @@ -1,10 +1,9 @@ package zio.actors -import zio.clock.Clock -import zio.{ IO, RIO, Schedule, URIO, ZIO } +import zio.{ RIO, Schedule, URIO, ZIO } private[actors] trait Supervisor[-R] { - def supervise[R0 <: R, A](zio: RIO[R0, A], error: Throwable): ZIO[R0 with Clock, Unit, A] + def supervise[R0 <: R, A](zio: RIO[R0, A], error: Throwable): ZIO[R0, Unit, A] } /** @@ -15,14 +14,14 @@ object Supervisor { final def none: Supervisor[Any] = retry(Schedule.once) final def retry[R, A](policy: Schedule[R, Throwable, A]): Supervisor[R] = - retryOrElse(policy, (_: Throwable, _: A) => IO.unit) + retryOrElse(policy, (_: Throwable, _: A) => ZIO.unit) final def retryOrElse[R, A]( policy: Schedule[R, Throwable, A], orElse: (Throwable, A) => URIO[R, Unit] ): Supervisor[R] = new Supervisor[R] { - override def supervise[R0 <: R, A0](zio: RIO[R0, A0], error: Throwable): ZIO[R0 with Clock, Unit, A0] = + override def supervise[R0 <: R, A0](zio: RIO[R0, A0], error: Throwable): ZIO[R0, Unit, A0] = zio .retryOrElse(policy, (e: Throwable, a: A) => orElse(e, a) *> ZIO.fail(error)) .mapError(_ => ()) diff --git a/actors/src/test/scala/zio/actors/ActorsSpec.scala b/actors/src/test/scala/zio/actors/ActorsSpec.scala index 72f46ce8..46ef03a3 100644 --- a/actors/src/test/scala/zio/actors/ActorsSpec.scala +++ b/actors/src/test/scala/zio/actors/ActorsSpec.scala @@ -1,15 +1,14 @@ package zio.actors import java.util.concurrent.atomic.AtomicBoolean - import zio.actors.Actor.Stateful -import zio.stream.Stream -import zio.{ Chunk, IO, Ref, Schedule, Task, UIO } +import zio.stream.{ Stream, ZStream } +import zio.{ Chunk, IO, Ref, Schedule, Task, UIO, ZIO } import zio.test._ import zio.test.Assertion._ object CounterUtils { - sealed trait Message[+_] + sealed trait Message[+A] case object Reset extends Message[Unit] case object Increase extends Message[Unit] case object Get extends Message[Int] @@ -17,19 +16,19 @@ object CounterUtils { } object TickUtils { - sealed trait Message[+_] + sealed trait Message[+A] case object Tick extends Message[Unit] } object StopUtils { - sealed trait Msg[+_] + sealed trait Msg[+A] case object Letter extends Msg[Unit] } -object ActorsSpec extends DefaultRunnableSpec { +object ActorsSpec extends ZIOSpecDefault { def spec = suite("Test the basic actor behavior")( - testM("Sequential message processing") { + test("Sequential message processing") { import CounterUtils._ val handler: Stateful[Any, Int, Message] = new Stateful[Any, Int, Message] { @@ -39,10 +38,10 @@ object ActorsSpec extends DefaultRunnableSpec { context: Context ): UIO[(Int, A)] = msg match { - case Reset => UIO((0, ())) - case Increase => UIO((state + 1, ())) - case Get => UIO((state, state)) - case IncreaseUpTo(upper) => UIO((upper, Stream.fromIterable(state until upper))) + case Reset => ZIO.succeed((0, ())) + case Increase => ZIO.succeed((state + 1, ())) + case Get => ZIO.succeed((state, state)) + case IncreaseUpTo(upper) => ZIO.succeed((upper, ZStream.fromIterable(state until upper))) } } @@ -57,12 +56,9 @@ object ActorsSpec extends DefaultRunnableSpec { c3 <- actor ? IncreaseUpTo(20) vals <- c3.runCollect c4 <- actor ? Get - } yield assert(c1)(equalTo(2)) && - assert(c2)(equalTo(0)) && - assert(vals)(equalTo(Chunk.apply(0 until 20: _*))) && - assert(c4)(equalTo(20)) + } yield assertTrue(c1 == 2, c2 == 0, vals == Chunk.apply(0 until 20: _*), c4 == 20) }, - testM("Error recovery by retrying") { + test("Error recovery by retrying") { import TickUtils._ val maxRetries = 10 @@ -79,8 +75,8 @@ object ActorsSpec extends DefaultRunnableSpec { ref .updateAndGet(_ + 1) .flatMap { v => - if (v < maxRetries) IO.fail(new Exception("fail")) - else IO.succeed((state, state)) + if (v < maxRetries) ZIO.fail(new Exception("fail")) + else ZIO.succeed((state, state)) } } } @@ -94,9 +90,9 @@ object ActorsSpec extends DefaultRunnableSpec { actor <- system.make("actor1", policy, (), handler) _ <- actor ? Tick count <- ref.get - } yield assert(count)(equalTo(maxRetries)) + } yield assertTrue(count == maxRetries) }, - testM("Error recovery by fallback action") { + test("Error recovery by fallback action") { import TickUtils._ val handler = new Stateful[Any, Unit, Message] { @@ -106,7 +102,7 @@ object ActorsSpec extends DefaultRunnableSpec { context: Context ): IO[Throwable, (Unit, A)] = msg match { - case Tick => IO.fail(new Exception("fail")) + case Tick => ZIO.fail(new Exception("fail")) } } @@ -115,7 +111,7 @@ object ActorsSpec extends DefaultRunnableSpec { val policy = Supervisor.retryOrElse[Any, Long]( schedule, - (_, _) => IO.effectTotal(called.set(true)) + (_, _) => ZIO.succeed(called.set(true)) ) val program = for { @@ -124,9 +120,9 @@ object ActorsSpec extends DefaultRunnableSpec { _ <- actor ? Tick } yield () - assertM(program.run)(fails(anything)).andThen(assertM(IO.effectTotal(called.get))(isTrue)) + assertZIO(program.exit)(fails(anything)) && assertZIO(ZIO.succeed(called.get))(isTrue) }, - testM("Stopping actors") { + test("Stopping actors") { import StopUtils._ val handler = new Stateful[Any, Unit, Msg] { @@ -136,7 +132,7 @@ object ActorsSpec extends DefaultRunnableSpec { context: Context ): IO[Throwable, (Unit, A)] = msg match { - case Letter => IO.succeed(((), ())) + case Letter => ZIO.succeed(((), ())) } } for { @@ -146,11 +142,11 @@ object ActorsSpec extends DefaultRunnableSpec { _ <- actor ? Letter dump <- actor.stop } yield assert(dump)( - isSubtype[List[_]](anything) && - hasField[List[_], Int]("size", _.size, equalTo(0)) + isSubtype[Chunk[_]](anything) && + hasField[Chunk[_], Int]("size", _.size, equalTo(0)) ) }, - testM("Select local actor") { + test("Select local actor") { import TickUtils._ val handler = new Stateful[Any, Unit, Message] { @@ -160,7 +156,7 @@ object ActorsSpec extends DefaultRunnableSpec { context: Context ): IO[Throwable, (Unit, A)] = msg match { - case Tick => IO.succeed(((), ())) + case Tick => ZIO.succeed(((), ())) } } for { @@ -169,9 +165,9 @@ object ActorsSpec extends DefaultRunnableSpec { actor <- system.select[Message]("zio://test5@0.0.0.0:0000/actor1-1") _ <- actor ! Tick actorPath <- actor.path - } yield assert(actorPath)(equalTo("zio://test5@0.0.0.0:0000/actor1-1")) + } yield assertTrue(actorPath == "zio://test5@0.0.0.0:0000/actor1-1") }, - testM("Local actor does not exist") { + test("Local actor does not exist") { import TickUtils._ val handler = new Stateful[Any, Unit, Message] { @@ -181,7 +177,7 @@ object ActorsSpec extends DefaultRunnableSpec { context: Context ): IO[Throwable, (Unit, A)] = msg match { - case Tick => IO.succeed(((), ())) + case Tick => ZIO.succeed(((), ())) } } @@ -192,7 +188,7 @@ object ActorsSpec extends DefaultRunnableSpec { _ <- actor ! Tick } yield () - assertM(program.run)( + assertZIO(program.exit)( fails(isSubtype[Throwable](anything)) && fails( hasField[Throwable, String]( diff --git a/actors/src/test/scala/zio/actors/RemoteSpec.scala b/actors/src/test/scala/zio/actors/RemoteSpec.scala index c21026fe..0dfb7382 100644 --- a/actors/src/test/scala/zio/actors/RemoteSpec.scala +++ b/actors/src/test/scala/zio/actors/RemoteSpec.scala @@ -3,13 +3,10 @@ package zio.actors import java.io.File import java.net.ConnectException +import zio.{ durationInt, Clock, Console, IO, ZIO } import zio.actors.Actor.Stateful -import zio.{ clock, console, IO } -import zio.test.DefaultRunnableSpec import zio.test._ import zio.test.Assertion._ -import zio.duration._ -import zio.test.environment.TestConsole import SpecUtils._ object SpecUtils { @@ -27,7 +24,7 @@ object SpecUtils { ): IO[MyErrorDomain, (Int, A)] = msg match { case Str(value) => - IO.effectTotal((state + 1, value + "received plus " + state + 1)) + ZIO.succeed((state + 1, value + "received plus " + state + 1)) } } @@ -46,19 +43,19 @@ object SpecUtils { case Ping(sender) => (for { path <- sender.path - _ <- console.putStrLn(s"Ping from: $path, sending pong") + _ <- Console.printLine(s"Ping from: $path, sending pong") _ <- sender ! Pong } yield ((), ())).asInstanceOf[IO[Throwable, (Unit, A)]] case Pong => (for { - _ <- console.putStrLn("Received pong") - _ <- IO.succeed(1) + _ <- Console.printLine("Received pong") + _ <- ZIO.succeed(1) } yield ((), ())).asInstanceOf[IO[Throwable, (Unit, A)]] case GameInit(to) => (for { - _ <- console.putStrLn("The game starts...") + _ <- Console.printLine("The game starts...") self <- context.self[PingPongProto] _ <- to ! Ping(self) } yield ((), ())).asInstanceOf[IO[Throwable, (Unit, A)]] @@ -75,18 +72,18 @@ object SpecUtils { context: Context ): IO[Throwable, (Unit, A)] = msg match { - case UnsafeMessage => IO.fail(new Exception("Error on remote side")) + case UnsafeMessage => ZIO.fail(new Exception("Error on remote side")) } } val configFile = Some(new File("./actors/src/test/resources/application.conf")) } -object RemoteSpec extends DefaultRunnableSpec { +object RemoteSpec extends ZIOSpecDefault { def spec = suite("RemoteSpec")( suite("Remote communication suite")( - testM("Remote test send message") { + test("Remote test send message") { for { actorSystemOne <- ActorSystem("testSystem11", configFile) _ <- actorSystemOne.make("actorOne", Supervisor.none, 0, handlerMessageTrait) @@ -95,9 +92,9 @@ object RemoteSpec extends DefaultRunnableSpec { "zio://testSystem11@127.0.0.1:9665/actorOne" ) result <- actorRef ? Str("ZIO-Actor response... ") - } yield assert(result)(equalTo("ZIO-Actor response... received plus 01")) + } yield assertTrue(result == "ZIO-Actor response... received plus 01") }, - testM("ActorRef serialization case") { + test("ActorRef serialization case") { for { actorSystemRoot <- ActorSystem("testSystem21", configFile) one <- actorSystemRoot.make("actorOne", Supervisor.none, (), protoHandler) @@ -111,25 +108,25 @@ object RemoteSpec extends DefaultRunnableSpec { _ <- one ! GameInit(remoteActor) - _ <- clock.sleep(2.seconds) + _ <- Clock.sleep(2.seconds) outputVector <- TestConsole.output - } yield assert(outputVector.size)(equalTo(3)) && - assert(outputVector(0))(equalTo("The game starts...\n")) && - assert(outputVector(1))( - equalTo("Ping from: zio://testSystem21@127.0.0.1:9667/actorOne, sending pong\n") - ) && - assert(outputVector(2))(equalTo("Received pong\n")) + } yield assertTrue( + outputVector.size == 3, + outputVector(0) == "The game starts...\n", + outputVector(1) == "Ping from: zio://testSystem21@127.0.0.1:9667/actorOne, sending pong\n", + outputVector(2) == "Received pong\n" + ) } ), suite("Error handling suite")( - testM("ActorRef not found case (in local actor system)") { + test("ActorRef not found case (in local actor system)") { val program = for { actorSystem <- ActorSystem("testSystem31", configFile) _ <- actorSystem.select[PingPongProto]("zio://testSystem31@127.0.0.1:9669/actorTwo") } yield () - assertM(program.run)( + assertZIO(program.exit)( fails(isSubtype[Throwable](anything)) && fails( hasField[Throwable, String]( @@ -140,7 +137,7 @@ object RemoteSpec extends DefaultRunnableSpec { ) ) }, - testM("Remote system does not exist") { + test("Remote system does not exist") { val program = for { actorSystem <- ActorSystem("testSystem41", configFile) actorRef <- actorSystem.select[PingPongProto]( @@ -149,12 +146,12 @@ object RemoteSpec extends DefaultRunnableSpec { _ <- actorRef ! GameInit(actorRef) } yield () - assertM(program.run)( + assertZIO(program.exit)( fails(isSubtype[ConnectException](anything)) && fails(hasField[Throwable, String]("message", _.getMessage, equalTo("Connection refused"))) ) }, - testM("Remote actor does not exist") { + test("Remote actor does not exist") { val program = for { actorSystemOne <- ActorSystem("testSystem51", configFile) _ <- ActorSystem("testSystem52", configFile) @@ -164,12 +161,12 @@ object RemoteSpec extends DefaultRunnableSpec { _ <- actorRef ? GameInit(actorRef) } yield () - assertM(program.run)( + assertZIO(program.exit)( fails(isSubtype[Throwable](anything)) && fails(hasField[Throwable, String]("message", _.getMessage, equalTo("No such remote actor"))) ) }, - testM("On remote side error message processing error") { + test("On remote side error message processing error") { val program = for { actorSystemOne <- ActorSystem("testSystem61", configFile) _ <- actorSystemOne.make("actorOne", Supervisor.none, (), errorHandler) @@ -180,12 +177,12 @@ object RemoteSpec extends DefaultRunnableSpec { _ <- actorRef ? UnsafeMessage } yield () - assertM(program.run)( + assertZIO(program.exit)( fails(isSubtype[Throwable](anything)) && fails(hasField[Throwable, String]("message", _.getMessage, equalTo("Error on remote side"))) ) }, - testM("remote test select actor with special symbols") { + test("remote test select actor with special symbols") { for { actorSystemOne <- ActorSystem("testSystem71", configFile) _ <- actorSystemOne.make("actor-One-;_&", Supervisor.none, 0, handlerMessageTrait) @@ -194,8 +191,8 @@ object RemoteSpec extends DefaultRunnableSpec { "zio://testSystem71@127.0.0.1:9677/actor-One-;_&" ) result <- actorRef ? Str("ZIO-Actor response... ") - } yield assert(result)(equalTo("ZIO-Actor response... received plus 01")) + } yield assertTrue(result == "ZIO-Actor response... received plus 01") } ) - ).provideCustomLayer(clock.Clock.live ++ environment.TestConsole.silent) + ) @@ TestAspect.withLiveClock @@ TestAspect.silent } diff --git a/akka-interop/src/main/scala/zio/actors/akka/AkkaTypedActor.scala b/akka-interop/src/main/scala/zio/actors/akka/AkkaTypedActor.scala index 9b06907d..d6db2296 100644 --- a/akka-interop/src/main/scala/zio/actors/akka/AkkaTypedActor.scala +++ b/akka-interop/src/main/scala/zio/actors/akka/AkkaTypedActor.scala @@ -1,7 +1,7 @@ package zio.actors.akka import akka.actor.typed -import zio.UIO +import zio.{ UIO, ZIO } object AkkaTypedActor { @@ -13,5 +13,5 @@ object AkkaTypedActor { * @return reference to the created proxy actor in effect that can't fail */ def make[F[+_]](actorRef: typed.ActorRef[F[_]]): UIO[AkkaTypedActorRefLocal[F]] = - UIO(new AkkaTypedActorRefLocal[F](actorRef.path.toString, actorRef)) + ZIO.succeed(new AkkaTypedActorRefLocal[F](actorRef.path.toString, actorRef)) } diff --git a/akka-interop/src/main/scala/zio/actors/akka/AkkaTypedActorRefLocal.scala b/akka-interop/src/main/scala/zio/actors/akka/AkkaTypedActorRefLocal.scala index 4c149144..cb5c1489 100644 --- a/akka-interop/src/main/scala/zio/actors/akka/AkkaTypedActorRefLocal.scala +++ b/akka-interop/src/main/scala/zio/actors/akka/AkkaTypedActorRefLocal.scala @@ -22,7 +22,7 @@ final class AkkaTypedActorRefLocal[-F[+_]] private[actors] ( * @param fa message * @return lifted unit */ - def !(fa: F[_]): Task[Unit] = UIO(akkaActor ! fa) + def !(fa: F[_]): Task[Unit] = ZIO.succeed(akkaActor ! fa) /** * Send a message to an actor as `ask` interaction pattern - @@ -40,5 +40,5 @@ final class AkkaTypedActorRefLocal[-F[+_]] private[actors] ( * * @return */ - val path: UIO[String] = UIO(actorName) + val path: UIO[String] = ZIO.succeed(actorName) } diff --git a/akka-interop/src/test/scala/zio/actors/akka/ActorsAkkaSpec.scala b/akka-interop/src/test/scala/zio/actors/akka/ActorsAkkaSpec.scala index cf03c713..7f67bad6 100644 --- a/akka-interop/src/test/scala/zio/actors/akka/ActorsAkkaSpec.scala +++ b/akka-interop/src/test/scala/zio/actors/akka/ActorsAkkaSpec.scala @@ -9,13 +9,13 @@ import zio.actors._ import zio.actors.akka.AkkaBehaviorsUtils._ import zio.test.Assertion._ import zio.test._ -import zio.{ IO, Runtime, ZIO } +import zio.{ IO, Runtime, Unsafe, ZIO } import scala.concurrent.duration._ object AkkaBehaviorsUtils { - sealed trait TypedMessage[+_] + sealed trait TypedMessage[+A] case object HelloFromZio extends TypedMessage[Unit] @@ -23,7 +23,7 @@ object AkkaBehaviorsUtils { case class PingToZio(zioReplyToActor: ActorRef[ZioMessage], msg: String) extends TypedMessage[Unit] - sealed trait ZioMessage[+_] + sealed trait ZioMessage[+A] case class Ping(akkaActor: AkkaTypedActorRefLocal[TypedMessage]) extends ZioMessage[Unit] @@ -38,8 +38,14 @@ object AkkaBehaviorsUtils { Behaviors.receiveMessage { message => message match { case HelloFromZio => () - case PingFromZio(zioSenderActor) => runtime.unsafeRun(zioSenderActor ! PongFromAkka("Pong from Akka")) - case PingToZio(zioReplyToActor, msgToZio) => runtime.unsafeRun(zioReplyToActor ! PongFromAkka(msgToZio)) + case PingFromZio(zioSenderActor) => + Unsafe.unsafe { implicit u => + runtime.unsafe.run(zioSenderActor ! PongFromAkka("Pong from Akka")).getOrThrowFiberFailure() + } + case PingToZio(zioReplyToActor, msgToZio) => + Unsafe.unsafe { implicit u => + runtime.unsafe.run(zioReplyToActor ! PongFromAkka(msgToZio)).getOrThrowFiberFailure() + } } Behaviors.same } @@ -48,10 +54,10 @@ object AkkaBehaviorsUtils { } object AskUtils { - sealed trait AskMessage[+_] + sealed trait AskMessage[+A] case class PingAsk(value: Int, replyTo: typed.ActorRef[Int]) extends AskMessage[Int] - sealed trait ZioMessage[+_] + sealed trait ZioMessage[+A] case class GetState(akkaActor: AkkaTypedActorRefLocal[AskMessage]) extends ZioMessage[Int] def PingAskDeferred(value: Int): typed.ActorRef[Int] => PingAsk = @@ -69,10 +75,10 @@ object AskUtils { } } -object ActorsAkkaSpec extends DefaultRunnableSpec { +object ActorsAkkaSpec extends ZIOSpecDefault { def spec = suite("Test the basic integration with akka typed actor behavior")( - testM("Send message from zioActor to akkaActor") { + test("Send message from zioActor to akkaActor") { import AkkaBehaviorsUtils._ val handler = new Stateful[Any, Int, ZioMessage] { @@ -83,20 +89,20 @@ object ActorsAkkaSpec extends DefaultRunnableSpec { ): ZIO[Any, Throwable, (Int, A)] = msg match { case Ping(akkaActor) => (akkaActor ! HelloFromZio).as((state, ())) - case _ => IO.fail(new Exception("fail")) + case _ => ZIO.fail(new Exception("fail")) } } val program = for { - typedActorSystem <- IO(typed.ActorSystem(TestBehavior(), "typedSystem")) + typedActorSystem <- ZIO.attempt(typed.ActorSystem(TestBehavior(), "typedSystem")) system <- ActorSystem("test1") zioActor <- system.make("actor1", Supervisor.none, 0, handler) akkaActor <- AkkaTypedActor.make(typedActorSystem) _ <- zioActor ! Ping(akkaActor) } yield () - assertM(program.run)(succeeds(anything)) + assertZIO(program.exit)(succeeds(anything)) }, - testM("Send message from akkaActor to zioActor") { + test("Send message from akkaActor to zioActor") { import AkkaBehaviorsUtils._ val handler = new Stateful[Any, String, ZioMessage] { override def receive[A]( @@ -105,20 +111,20 @@ object ActorsAkkaSpec extends DefaultRunnableSpec { context: Context ): IO[Throwable, (String, A)] = msg match { - case PongFromAkka(msg) => IO.succeed((msg, ())) - case _ => IO.fail(new Exception("fail")) + case PongFromAkka(msg) => ZIO.succeed((msg, ())) + case _ => ZIO.fail(new Exception("fail")) } } val program = for { - typedActorSystem <- IO(typed.ActorSystem(TestBehavior(), "typedSystem2")) + typedActorSystem <- ZIO.attempt(typed.ActorSystem(TestBehavior(), "typedSystem2")) system <- ActorSystem("test2") akkaActor <- AkkaTypedActor.make(typedActorSystem) zioActor <- system.make("actor2", Supervisor.none, "", handler) _ <- akkaActor ! PingToZio(zioActor, "Ping from Akka") } yield () - assertM(program.run)(succeeds(anything)) + assertZIO(program.exit)(succeeds(anything)) }, - testM("ZioActor send message to akkaActor and then replyTo to zioActor") { + test("ZioActor send message to akkaActor and then replyTo to zioActor") { val handler = new Stateful[Any, String, ZioMessage] { override def receive[A]( @@ -132,20 +138,20 @@ object ActorsAkkaSpec extends DefaultRunnableSpec { self <- context.self[ZioMessage] _ <- akkaActor ! PingFromZio(self) } yield (state, ()) - case PongFromAkka(msg) => IO.succeed((msg, ())) - case _ => IO.fail(new Exception("fail")) + case PongFromAkka(msg) => ZIO.succeed((msg, ())) + case _ => ZIO.fail(new Exception("fail")) } } val program = for { - typedActorSystem <- IO(typed.ActorSystem(TestBehavior(), "typedSystem3")) + typedActorSystem <- ZIO.attempt(typed.ActorSystem(TestBehavior(), "typedSystem3")) system <- ActorSystem("test3") zioActor <- system.make("actor3", Supervisor.none, "", handler) akkaActor <- AkkaTypedActor.make(typedActorSystem) _ <- zioActor ! Ping(akkaActor) } yield () - assertM(program.run)(succeeds(anything)) + assertZIO(program.exit)(succeeds(anything)) }, - testM("send ask message to akkaActor and get response") { + test("send ask message to akkaActor and get response") { import AskUtils._ @@ -160,9 +166,9 @@ object ActorsAkkaSpec extends DefaultRunnableSpec { for { akkaActor <- AkkaTypedActor.make(typedActorSystem) result <- akkaActor ? PingAskDeferred(1000) - } yield assert(result)(equalTo(1000)) + } yield assertTrue(result == 1000) }, - testM("send message to zioActor and ask akkaActor for the response") { + test("send message to zioActor and ask akkaActor for the response") { import AskUtils._ @@ -181,7 +187,7 @@ object ActorsAkkaSpec extends DefaultRunnableSpec { msg match { case GetState(akkaActor) => (akkaActor ? PingAskDeferred(1000)).map(newState => (newState, newState)) - case _ => IO.fail(new Exception("fail")) + case _ => ZIO.fail(new Exception("fail")) } } for { @@ -189,7 +195,7 @@ object ActorsAkkaSpec extends DefaultRunnableSpec { zioActor <- system.make("actor3", Supervisor.none, 0, handler) akkaActor <- AkkaTypedActor.make(typedActorSystem) result <- zioActor ? GetState(akkaActor) - } yield assert(result)(equalTo(1000)) + } yield assertTrue(result == 1000) } ) } diff --git a/build.sbt b/build.sbt index d0ecc248..03e1d1cd 100644 --- a/build.sbt +++ b/build.sbt @@ -31,11 +31,11 @@ inThisBuild( addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt") addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck") -val zioVersion = "1.0.4" -val zioNioVersion = "1.0.0-RC9" -val zioConfigVersion = "1.0.0-RC30-1" -val zioInteropCatsVersion = "2.4.1.0" -val akkaActorTypedVersion = "2.6.18" +val zioVersion = "2.0.2" +val zioNioVersion = "2.0.0" +val zioConfigVersion = "3.0.2" +val zioInteropCatsVersion = "22.0.0.0" +val akkaActorTypedVersion = "2.6.19" val doobieVersion = "0.13.4" lazy val root = diff --git a/examples/src/main/scala/zio/actors/examples/persistence/ShoppingCart.scala b/examples/src/main/scala/zio/actors/examples/persistence/ShoppingCart.scala index 8fd100df..b1cf85d9 100644 --- a/examples/src/main/scala/zio/actors/examples/persistence/ShoppingCart.scala +++ b/examples/src/main/scala/zio/actors/examples/persistence/ShoppingCart.scala @@ -1,12 +1,10 @@ package zio.actors.examples.persistence import java.time.Instant - -import zio.UIO +import zio.{ UIO, ZIO } import zio.actors.persistence.PersistenceId.PersistenceId import zio.actors.{ persistence, Context } import zio.actors.persistence._ -import zio.clock.Clock /** * This is a full example of [[https://github.com/akka/akka-samples/tree/2.6/akka-sample-persistence-scala Akka persistence shopping cart]] @@ -44,7 +42,7 @@ object ShoppingCart { val empty = State(items = Map.empty, checkoutDate = None) } - sealed trait Command[+_] + sealed trait Command[+A] final case class AddItem(itemId: String, quantity: Int) extends Command[Confirmation] final case class RemoveItem(itemId: String) extends Command[Confirmation] final case class AdjustItemQuantity(itemId: String, quantity: Int) extends Command[Confirmation] @@ -66,8 +64,8 @@ object ShoppingCart { final case class ItemQuantityAdjusted(cartId: String, itemId: String, newQuantity: Int) extends Event final case class CheckedOut(cartId: String, eventTime: Instant) extends Event - def apply(cartId: String): EventSourcedStateful[Clock, State, Command, Event] = - new EventSourcedStateful[Clock, State, Command, Event](PersistenceId(cartId)) { + def apply(cartId: String): EventSourcedStateful[Any, State, Command, Event] = + new EventSourcedStateful[Any, State, Command, Event](PersistenceId(cartId)) { override def receive[A]( state: State, msg: Command[A], @@ -94,36 +92,42 @@ object ShoppingCart { command match { case AddItem(itemId, quantity) => if (state.hasItem(itemId)) - UIO((Command.ignore, _ => Rejected(s"Item '$itemId' was already added to this shopping cart"))) + ZIO.succeed((Command.ignore, _ => Rejected(s"Item '$itemId' was already added to this shopping cart"))) else if (quantity <= 0) - UIO((Command.ignore, _ => Rejected("Quantity must be greater than zero"))) + ZIO.succeed((Command.ignore, _ => Rejected("Quantity must be greater than zero"))) else - UIO((Command.persist(ItemAdded(cartId, itemId, quantity)), updatedState => Accepted(updatedState.toSummary))) + ZIO.succeed( + (Command.persist(ItemAdded(cartId, itemId, quantity)), updatedState => Accepted(updatedState.toSummary)) + ) case RemoveItem(itemId) => if (state.hasItem(itemId)) - UIO((Command.persist(ItemRemoved(cartId, itemId)), updatedState => Accepted(updatedState.toSummary))) + ZIO.succeed((Command.persist(ItemRemoved(cartId, itemId)), updatedState => Accepted(updatedState.toSummary))) else - UIO((Command.ignore, _ => Accepted(state.toSummary))) + ZIO.succeed((Command.ignore, _ => Accepted(state.toSummary))) case AdjustItemQuantity(itemId, quantity) => if (quantity <= 0) - UIO((Command.ignore, _ => Rejected("Quantity must be greater than zero"))) + ZIO.succeed((Command.ignore, _ => Rejected("Quantity must be greater than zero"))) else if (state.hasItem(itemId)) - UIO( + ZIO.succeed( ( Command.persist(ItemQuantityAdjusted(cartId, itemId, quantity)), updatedCart => Accepted(updatedCart.toSummary) ) ) else - UIO((Command.ignore, _ => Rejected(s"Cannot adjust quantity for item '$itemId'. Item not present on cart"))) + ZIO.succeed( + (Command.ignore, _ => Rejected(s"Cannot adjust quantity for item '$itemId'. Item not present on cart")) + ) case Checkout => if (state.isEmpty) - UIO((Command.ignore, _ => Rejected("Cannot checkout an empty shopping cart"))) + ZIO.succeed((Command.ignore, _ => Rejected("Cannot checkout an empty shopping cart"))) else - UIO((Command.persist(CheckedOut(cartId, Instant.now())), updatedCart => Accepted(updatedCart.toSummary))) + ZIO.succeed( + (Command.persist(CheckedOut(cartId, Instant.now())), updatedCart => Accepted(updatedCart.toSummary)) + ) case Get => - UIO((Command.ignore, _ => state.toSummary)) + ZIO.succeed((Command.ignore, _ => state.toSummary)) } private def checkedOutShoppingCart[A]( @@ -133,15 +137,21 @@ object ShoppingCart { ): UIO[(persistence.Command[Event], State => A)] = command match { case Get => - UIO((Command.ignore, _ => state.toSummary)) + ZIO.succeed((Command.ignore, _ => state.toSummary)) case _: AddItem => - UIO((Command.ignore, _ => Rejected(s"Can't add an item to an already checked out $cartId shopping cart"))) + ZIO.succeed( + (Command.ignore, _ => Rejected(s"Can't add an item to an already checked out $cartId shopping cart")) + ) case _: RemoveItem => - UIO((Command.ignore, _ => Rejected(s"Can't remove an item from an already checked out $cartId shopping cart"))) + ZIO.succeed( + (Command.ignore, _ => Rejected(s"Can't remove an item from an already checked out $cartId shopping cart")) + ) case _: AdjustItemQuantity => - UIO((Command.ignore, _ => Rejected(s"Can't adjust item on an already checked out $cartId shopping cart"))) + ZIO.succeed( + (Command.ignore, _ => Rejected(s"Can't adjust item on an already checked out $cartId shopping cart")) + ) case Checkout => - UIO((Command.ignore, _ => Rejected(s"Can't checkout already checked out $cartId shopping cart"))) + ZIO.succeed((Command.ignore, _ => Rejected(s"Can't checkout already checked out $cartId shopping cart"))) } diff --git a/examples/src/test/scala/zio/actors/examples/persistence/ShoppingCartSpec.scala b/examples/src/test/scala/zio/actors/examples/persistence/ShoppingCartSpec.scala index c1a0c529..61c8c9bb 100644 --- a/examples/src/test/scala/zio/actors/examples/persistence/ShoppingCartSpec.scala +++ b/examples/src/test/scala/zio/actors/examples/persistence/ShoppingCartSpec.scala @@ -1,94 +1,94 @@ package zio.actors.examples.persistence -import java.io.File -import java.util.concurrent.TimeUnit - -import zio.{ Cause, Has, Managed, Schedule, UIO, ZIO, ZLayer } +import zio.actors.examples.persistence.Deps._ +import zio.actors.examples.persistence.ShoppingCart._ import zio.actors.{ ActorSystem, Supervisor } -import zio.duration.Duration import zio.test.Assertion._ -import zio.test.{ assert, suite, testM, DefaultRunnableSpec, TestFailure } -import ShoppingCart._ -import Deps._ +import zio.test.{ assert, assertTrue, ZIOSpecDefault } +import zio.{ Duration, Schedule, Scope, ZIO, ZLayer } + +import java.io.File +import java.util.concurrent.TimeUnit object Deps { - val actorSystem: Managed[TestFailure[Throwable], ActorSystem] = - Managed.make( + val actorSystem: ZIO[Scope, Throwable, ActorSystem] = + ZIO.acquireRelease( ActorSystem("testSys", Some(new File("./src/test/resources/application.conf"))) - .mapError(e => TestFailure.Runtime(Cause.die(e))) - )(_.shutdown.catchAll(_ => UIO.unit)) + )(_.shutdown.ignore) - val actorSystemLayer = ZLayer.fromManaged(actorSystem) + val actorSystemLayer = ZLayer.scoped(actorSystem) val recoverPolicy = Supervisor.retry(Schedule.exponential(Duration(200, TimeUnit.MILLISECONDS))) } -object ShoppingCartSpec extends DefaultRunnableSpec { +object ShoppingCartSpec extends ZIOSpecDefault { def spec = suite("The Shopping Cart should")( - testM("add item") { + test("add item") { for { - as <- ZIO.environment[Has[ActorSystem]].map(_.get) + as <- ZIO.service[ActorSystem] cart <- as.make("cart1", recoverPolicy, State.empty, ShoppingCart("cart1")) conf <- cart ? AddItem("foo", 42) - } yield assert(conf)(equalTo(Accepted(Summary(Map("foo" -> 42), checkedOut = false)))) + } yield assertTrue(conf == Accepted(Summary(Map("foo" -> 42), checkedOut = false))) }, - testM("reject already added item") { + test("reject already added item") { for { - as <- ZIO.environment[Has[ActorSystem]].map(_.get) + as <- ZIO.service[ActorSystem] cart <- as.make("cart2", recoverPolicy, State.empty, ShoppingCart("cart2")) conf1 <- cart ? AddItem("foo", 42) conf2 <- cart ? AddItem("foo", 13) } yield assert(conf1)(isSubtype[Accepted](anything)) && assert(conf2)(isSubtype[Rejected](anything)) }, - testM("remove item") { + test("remove item") { for { - as <- ZIO.environment[Has[ActorSystem]].map(_.get) + as <- ZIO.service[ActorSystem] cart <- as.make("cart3", recoverPolicy, State.empty, ShoppingCart("cart3")) conf1 <- cart ? AddItem("foo", 42) conf2 <- cart ? RemoveItem("foo") - } yield assert(conf1)(isSubtype[Accepted](anything)) && assert(conf2)( - equalTo(Accepted(Summary(Map.empty, checkedOut = false))) + } yield assert(conf1)(isSubtype[Accepted](anything)) && assertTrue( + conf2 == Accepted(Summary(Map.empty, checkedOut = false)) ) }, - testM("adjust quantity") { + test("adjust quantity") { for { - as <- ZIO.environment[Has[ActorSystem]].map(_.get) + as <- ZIO.service[ActorSystem] cart <- as.make("cart4", recoverPolicy, State.empty, ShoppingCart("cart4")) conf1 <- cart ? AddItem("foo", 42) conf2 <- cart ? AdjustItemQuantity("foo", 43) - } yield assert(conf1)(isSubtype[Accepted](anything)) && assert(conf2)( - equalTo(Accepted(Summary(Map("foo" -> 43), checkedOut = false))) + } yield assert(conf1)(isSubtype[Accepted](anything)) && assertTrue( + conf2 == Accepted(Summary(Map("foo" -> 43), checkedOut = false)) ) }, - testM("checkout") { + test("checkout") { for { - as <- ZIO.environment[Has[ActorSystem]].map(_.get) + as <- ZIO.service[ActorSystem] cart <- as.make("cart5", recoverPolicy, State.empty, ShoppingCart("cart5")) conf1 <- cart ? AddItem("foo", 42) conf2 <- cart ? Checkout - } yield assert(conf1)(isSubtype[Accepted](anything)) && assert(conf2)( - equalTo(Accepted(Summary(Map("foo" -> 42), checkedOut = true))) + } yield assert(conf1)(isSubtype[Accepted](anything)) && assertTrue( + conf2 == Accepted(Summary(Map("foo" -> 42), checkedOut = true)) ) }, - testM("keep its state") { + test("keep its state") { for { - as <- ZIO.environment[Has[ActorSystem]].map(_.get) + as <- ZIO.service[ActorSystem] cart <- as.make("cart6", recoverPolicy, State.empty, ShoppingCart("cart6")) conf1 <- cart ? AddItem("foo", 42) _ <- cart.stop cart <- as.make("cart6", recoverPolicy, State.empty, ShoppingCart("cart6")) status <- cart ? Get - } yield assert(conf1)(equalTo(Accepted(Summary(Map("foo" -> 42), checkedOut = false)))) && - assert(status)(equalTo(Summary(Map("foo" -> 42), checkedOut = false))) + } yield assertTrue( + conf1 == Accepted(Summary(Map("foo" -> 42), checkedOut = false)), + status == Summary(Map("foo" -> 42), checkedOut = false) + ) } - ).provideCustomLayer(actorSystemLayer) + ).provide(actorSystemLayer) } diff --git a/persistence-jdbc/src/main/scala/zio/actors/persistence/jdbc/JDBCConfig.scala b/persistence-jdbc/src/main/scala/zio/actors/persistence/jdbc/JDBCConfig.scala index 5deceb61..cc1ac0a7 100644 --- a/persistence-jdbc/src/main/scala/zio/actors/persistence/jdbc/JDBCConfig.scala +++ b/persistence-jdbc/src/main/scala/zio/actors/persistence/jdbc/JDBCConfig.scala @@ -14,9 +14,9 @@ private[actors] object JDBCConfig { val dbConfig: ConfigDescriptor[DbConfig] = nested("persistence") { - (string("url").xmap[DbURL](DbURL, _.value) |@| - string("user").xmap[DbUser](DbUser, _.value) |@| - string("pass").xmap[DbPass](DbPass, _.value))(DbConfig.apply, DbConfig.unapply) + (string("url").transform[DbURL](DbURL, _.value) zip + string("user").transform[DbUser](DbUser, _.value) zip + string("pass").transform[DbPass](DbPass, _.value)).to[DbConfig] } def getDbConfig(systemName: String, configStr: String): Task[DbConfig] = diff --git a/persistence-jdbc/src/main/scala/zio/actors/persistence/jdbc/JDBCJournal.scala b/persistence-jdbc/src/main/scala/zio/actors/persistence/jdbc/JDBCJournal.scala index 22a46bb6..8b262386 100644 --- a/persistence-jdbc/src/main/scala/zio/actors/persistence/jdbc/JDBCJournal.scala +++ b/persistence-jdbc/src/main/scala/zio/actors/persistence/jdbc/JDBCJournal.scala @@ -5,13 +5,12 @@ import com.zaxxer.hikari.HikariDataSource import doobie._ import doobie.hikari.HikariTransactor import doobie.implicits._ -import zio.{ IO, Promise, Runtime, Task, UIO, ZIO } import zio.actors.ActorSystemUtils import zio.actors.persistence.PersistenceId.PersistenceId -import zio.actors.persistence.journal.{ Journal, JournalFactory } import zio.actors.persistence.jdbc.JDBCConfig.DbConfig -import zio.blocking.Blocking +import zio.actors.persistence.journal.{ Journal, JournalFactory } import zio.interop.catz._ +import zio.{ Promise, Runtime, Task, Unsafe, ZIO } private[actors] final class JDBCJournal[Ev](tnx: Transactor[Task]) extends Journal[Ev] { @@ -24,7 +23,7 @@ private[actors] final class JDBCJournal[Ev](tnx: Transactor[Task]) extends Journ override def getEvents(persistenceId: PersistenceId): Task[Seq[Ev]] = for { bytes <- SqlEvents.getEventsById(persistenceId).to[Seq].transact(tnx) - events <- IO.collectAll(bytes.map(ActorSystemUtils.objFromByteArray(_).map(_.asInstanceOf[Ev]))) + events <- ZIO.collectAll(bytes.map(ActorSystemUtils.objFromByteArray(_).map(_.asInstanceOf[Ev]))) } yield events } @@ -32,7 +31,10 @@ private[actors] final class JDBCJournal[Ev](tnx: Transactor[Task]) extends Journ object JDBCJournal extends JournalFactory { private lazy val runtime = Runtime.default - private lazy val transactorPromise = runtime.unsafeRun(Promise.make[Exception, HikariTransactor[Task]]) + private lazy val transactorPromise = + Unsafe.unsafe { implicit u => + runtime.unsafe.run(Promise.make[Exception, HikariTransactor[Task]]).getOrThrowFiberFailure() + } def getJournal[Ev](actorSystemName: String, configStr: String): Task[JDBCJournal[Ev]] = for { @@ -40,16 +42,16 @@ object JDBCJournal extends JournalFactory { tnx <- getTransactor(dbConfig) } yield new JDBCJournal[Ev](tnx) - private def makeTransactor(dbConfig: DbConfig): ZIO[Blocking, Throwable, HikariTransactor[Task]] = - ZIO.runtime[Blocking].flatMap { implicit rt => + private def makeTransactor(dbConfig: DbConfig): ZIO[Any, Throwable, HikariTransactor[Task]] = + ZIO.runtime[Any].flatMap { implicit rt => for { - transactEC <- UIO(rt.environment.get.blockingExecutor.asEC) - connectEC = rt.platform.executor.asEC + transactEC <- ZIO.blockingExecutor.map(_.asExecutionContext) + connectEC <- ZIO.executor.map(_.asExecutionContext) ds = new HikariDataSource() _ = ds.setJdbcUrl(dbConfig.dbURL.value) _ = ds.setUsername(dbConfig.dbUser.value) _ = ds.setPassword(dbConfig.dbPass.value) - transactor <- IO.effect(HikariTransactor.apply[Task](ds, connectEC, Blocker.liftExecutionContext(transactEC))) + transactor <- ZIO.attempt(HikariTransactor.apply[Task](ds, connectEC, Blocker.liftExecutionContext(transactEC))) } yield transactor } @@ -58,7 +60,7 @@ object JDBCJournal extends JournalFactory { case Some(value) => value case None => for { - newTnx <- makeTransactor(dbConfig).provideLayer(Blocking.live) + newTnx <- makeTransactor(dbConfig) _ <- transactorPromise.succeed(newTnx) } yield newTnx } diff --git a/persistence/src/main/scala/zio/actors/persistence/EventSource.scala b/persistence/src/main/scala/zio/actors/persistence/EventSource.scala index 4d504ee1..25bf2e4d 100644 --- a/persistence/src/main/scala/zio/actors/persistence/EventSource.scala +++ b/persistence/src/main/scala/zio/actors/persistence/EventSource.scala @@ -1,12 +1,12 @@ package zio.actors.persistence -import scala.reflect.runtime.universe -import zio.actors.{ Actor, Context, Supervisor } -import zio.{ IO, Queue, RIO, Ref, Task } import zio.actors.Actor._ +import zio.actors.persistence.PersistenceId._ import zio.actors.persistence.journal.{ Journal, JournalFactory } -import PersistenceId._ -import zio.clock.Clock +import zio.actors.{ Actor, Context, Supervisor } +import zio.{ Queue, RIO, Ref, Task, ZIO } + +import scala.reflect.runtime.universe /** * Each message can result in either an event that will be persisted or idempotent action. @@ -49,33 +49,36 @@ abstract class EventSourcedStateful[R, S, -F[+_], Ev](persistenceId: Persistence context: Context, optOutActorSystem: () => Task[Unit], mailboxSize: Int = DefaultActorMailboxSize - )(initial: S): RIO[R with Clock, Actor[F]] = { + )(initial: S): RIO[R, Actor[F]] = { val mirror = universe.runtimeMirror(getClass.getClassLoader) def retrieveJournal: Task[Journal[Ev]] = for { - configStr <- IO + configStr <- ZIO .fromOption(context.actorSystemConfig) - .mapError(_ => new Exception("Couldn't retrieve persistence config")) + .orElseFail(new Exception("Couldn't retrieve persistence config")) systemName = context.actorSystemName pluginClass <- PersistenceConfig.getPluginClass(systemName, configStr) - maybeFactory <- Task(mirror.reflectModule(mirror.staticModule(pluginClass.value)).instance).mapError(e => - new IllegalArgumentException(s"Could not load plugin class $pluginClass from $configStr", e) - ) + maybeFactory <- + ZIO + .attempt(mirror.reflectModule(mirror.staticModule(pluginClass.value)).instance) + .mapError(e => new IllegalArgumentException(s"Could not load plugin class $pluginClass from $configStr", e)) factory <- - Task(maybeFactory.asInstanceOf[JournalFactory]).mapError(e => - new IllegalArgumentException( - s"Plugin class $maybeFactory from $configStr is not a ${classOf[JournalFactory].getCanonicalName}", - e + ZIO + .attempt(maybeFactory.asInstanceOf[JournalFactory]) + .mapError(e => + new IllegalArgumentException( + s"Plugin class $maybeFactory from $configStr is not a ${classOf[JournalFactory].getCanonicalName}", + e + ) ) - ) journal <- factory.getJournal[Ev](systemName, configStr) } yield journal def applyEvents(events: Seq[Ev], state: S): S = events.foldLeft(state)(sourceEvent) - def process[A](msg: PendingMessage[F, A], state: Ref[S], journal: Journal[Ev]): RIO[R with Clock, Unit] = + def process[A](msg: PendingMessage[F, A], state: Ref[S], journal: Journal[Ev]): RIO[R, Unit] = for { s <- state.get (fa, promise) = msg @@ -97,11 +100,11 @@ abstract class EventSourcedStateful[R, S, -F[+_], Ev](persistenceId: Persistence } yield res } ).tupled - _ <- receiver.foldM( + _ <- receiver.foldZIO( e => supervisor .supervise(receiver, e) - .foldM(_ => promise.fail(e), fullCompleter), + .foldZIO(_ => promise.fail(e), fullCompleter), fullCompleter ) } yield () diff --git a/persistence/src/main/scala/zio/actors/persistence/PersistenceConfig.scala b/persistence/src/main/scala/zio/actors/persistence/PersistenceConfig.scala index 078c8452..1b0f6488 100644 --- a/persistence/src/main/scala/zio/actors/persistence/PersistenceConfig.scala +++ b/persistence/src/main/scala/zio/actors/persistence/PersistenceConfig.scala @@ -1,14 +1,16 @@ package zio.actors.persistence -import zio.{ IO, Managed, Promise, Runtime, Task, UIO } +import zio.actors.ActorsConfig._ import zio.config.ConfigDescriptor import zio.config.ConfigDescriptor._ -import zio.actors.ActorsConfig._ +import zio.{ Promise, Runtime, Task, Unsafe, ZIO } private[actors] object PersistenceConfig { private lazy val runtime = Runtime.default - private lazy val promise = runtime.unsafeRun(Promise.make[Exception, String]) + private lazy val promise = Unsafe.unsafe { implicit u => + runtime.unsafe.run(Promise.make[Exception, String]).getOrThrowFiberFailure() + } final case class JournalPluginRaw(value: String) extends AnyVal final case class JournalPluginClass(value: String) extends AnyVal @@ -17,19 +19,19 @@ private[actors] object PersistenceConfig { val pluginConfig: ConfigDescriptor[JournalPluginRaw] = nested("persistence") { - string("plugin").xmap(JournalPluginRaw, _.value) + string("plugin").transform(JournalPluginRaw, _.value) } def classPathConfig(pluginClass: String): ConfigDescriptor[JournalPluginClass] = nested("persistence") { nested("datastores") { - string(pluginClass).xmap(JournalPluginClass, _.value) + string(pluginClass).transform(JournalPluginClass, _.value) } } val inMemConfig: ConfigDescriptor[InMemConfig] = nested("persistence") { - string("key").xmap(InMemConfig, _.key) + string("key").transform(InMemConfig, _.key) } def getPluginClass(systemName: String, configStr: String): Task[JournalPluginClass] = @@ -43,9 +45,13 @@ private[actors] object PersistenceConfig { value case None => for { - inputStream <- IO(getClass.getResourceAsStream("/datastores.conf")) - source <- IO(scala.io.Source.fromInputStream(inputStream)) - str <- Managed.make(IO(source))(s => UIO(s.close())).use(s => IO(s.mkString)) + inputStream <- ZIO.attempt(getClass.getResourceAsStream("/datastores.conf")) + source <- ZIO.attempt(scala.io.Source.fromInputStream(inputStream)) + str <- ZIO.scoped { + ZIO + .acquireRelease(ZIO.succeed(source))(s => ZIO.succeed(s.close())) + .flatMap(s => ZIO.attempt(s.mkString)) + } _ <- promise.succeed(str) } yield str } diff --git a/persistence/src/main/scala/zio/actors/persistence/journal/InMemJournal.scala b/persistence/src/main/scala/zio/actors/persistence/journal/InMemJournal.scala index 786993f6..35d86998 100644 --- a/persistence/src/main/scala/zio/actors/persistence/journal/InMemJournal.scala +++ b/persistence/src/main/scala/zio/actors/persistence/journal/InMemJournal.scala @@ -1,6 +1,6 @@ package zio.actors.persistence.journal -import zio.{ Ref, Runtime, Task, UIO } +import zio.{ Ref, Runtime, Task, UIO, Unsafe, ZIO } import zio.actors.persistence.PersistenceId.PersistenceId import zio.actors.persistence.PersistenceConfig import InMemJournal.JournalRow @@ -33,7 +33,10 @@ object InMemJournal extends JournalFactory { j <- Ref.make(Map.empty[String, InMemJournal[_]]) _ <- j.set(Map.empty) } yield j - runtime.unsafeRun(journalEff) + + Unsafe.unsafe { implicit u => + runtime.unsafe.run(journalEff).getOrThrowFiberFailure() + } } def getJournal[Ev](actorSystemName: String, configStr: String): Task[InMemJournal[Ev]] = @@ -43,7 +46,7 @@ object InMemJournal extends JournalFactory { map <- journalMap.get journal <- map.get(key) match { case Some(j) => - UIO.effectTotal(j.asInstanceOf[InMemJournal[Ev]]) + ZIO.succeed(j.asInstanceOf[InMemJournal[Ev]]) case None => for { j <- InMemJournal.make[Ev]() diff --git a/persistence/src/test/scala/zio/actors/persistence/PersistenceSpec.scala b/persistence/src/test/scala/zio/actors/persistence/PersistenceSpec.scala index a288df2d..dfda13e0 100644 --- a/persistence/src/test/scala/zio/actors/persistence/PersistenceSpec.scala +++ b/persistence/src/test/scala/zio/actors/persistence/PersistenceSpec.scala @@ -1,17 +1,15 @@ package zio.actors.persistence import java.io.File - import zio.actors.{ ActorSystem, Context, Supervisor } -import zio.UIO -import zio.test.DefaultRunnableSpec +import zio.{ UIO, ZIO } import zio.test._ import zio.test.Assertion._ import CounterUtils._ import SpecUtils._ object CounterUtils { - sealed trait Message[+_] + sealed trait Message[+A] case object Reset extends Message[Unit] case object Increase extends Message[Unit] case object Get extends Message[Int] @@ -30,9 +28,9 @@ object SpecUtils { context: Context ): UIO[(Command[CounterEvent], Int => A)] = msg match { - case Reset => UIO((Command.persist(ResetEvent), _ => ())) - case Increase => UIO((Command.persist(IncreaseEvent), _ => ())) - case Get => UIO((Command.ignore, _ => state)) + case Reset => ZIO.succeed((Command.persist(ResetEvent), _ => ())) + case Increase => ZIO.succeed((Command.persist(IncreaseEvent), _ => ())) + case Get => ZIO.succeed((Command.ignore, _ => state)) } override def sourceEvent(state: Int, event: CounterEvent): Int = @@ -45,11 +43,11 @@ object SpecUtils { val configFile = Some(new File("./persistence/src/test/resources/application.conf")) } -object PersistenceSpec extends DefaultRunnableSpec { +object PersistenceSpec extends ZIOSpecDefault { def spec = suite("PersistenceSpec")( suite("Basic persistence operation")( - testM("Restarting persisted actor") { + test("Restarting persisted actor") { for { actorSystem <- ActorSystem("testSystem1", configFile) actor <- actorSystem.make("actor1", Supervisor.none, 0, ESCounterHandler) @@ -59,15 +57,15 @@ object PersistenceSpec extends DefaultRunnableSpec { actor <- actorSystem.make("actor1", Supervisor.none, 0, ESCounterHandler) _ <- actor ! Increase counter <- actor ? Get - } yield assert(counter)(equalTo(3)) + } yield assertTrue(counter == 3) }, - testM("Corrupt plugin config name") { + test("Corrupt plugin config name") { val program = for { as <- ActorSystem("testSystem3", configFile) _ <- as.make("actor1", Supervisor.none, 0, ESCounterHandler) } yield () - assertM(program.run)( + assertZIO(program.exit)( fails(isSubtype[Throwable](anything)) && fails( hasField[Throwable, Boolean]( @@ -78,13 +76,13 @@ object PersistenceSpec extends DefaultRunnableSpec { ) ) }, - testM("Plugin with a non-existing factory class") { + test("Plugin with a non-existing factory class") { val program = for { as <- ActorSystem("testSystem4", configFile) _ <- as.make("actor1", Supervisor.none, 0, ESCounterHandler) } yield () - assertM(program.run)( + assertZIO(program.exit)( fails(isSubtype[Throwable](anything)) && fails( hasField[Throwable, Boolean]( @@ -95,13 +93,13 @@ object PersistenceSpec extends DefaultRunnableSpec { ) ) }, - testM("Plugin with an incorrect factory") { + test("Plugin with an incorrect factory") { val program = for { as <- ActorSystem("testSystem5", configFile) _ <- as.make("actor1", Supervisor.none, 0, ESCounterHandler) } yield () - assertM(program.run)( + assertZIO(program.exit)( fails(isSubtype[Throwable](anything)) && fails( hasField[Throwable, Boolean]( diff --git a/project/BuildHelper.scala b/project/BuildHelper.scala index 5d219bf3..4052d5aa 100644 --- a/project/BuildHelper.scala +++ b/project/BuildHelper.scala @@ -5,8 +5,8 @@ import BuildInfoKeys._ object BuildHelper { private val Scala212 = "2.12.15" - private val Scala213 = "2.13.3" - private val SilencerVersion = "1.7.7" + private val Scala213 = "2.13.8" + private val SilencerVersion = "1.7.9" private val stdOptions = Seq( "-encoding", @@ -31,6 +31,7 @@ object BuildHelper { "-Wunused:patvars", "-Wunused:privates", "-Wunused:params", + "-Xlint:-infer-any", "-Wvalue-discard" ) @@ -39,10 +40,10 @@ object BuildHelper { "-Ypartial-unification", "-Ywarn-nullary-override", "-Yno-adapted-args", - "-Ywarn-infer-any", "-Ywarn-inaccessible", "-Ywarn-nullary-unit", - "-Ywarn-unused-import" + "-Ywarn-unused-import", + "-Xlint:-infer-any" ) private def extraOptions(scalaVersion: String) =