Skip to content

Commit

Permalink
Upgrade to ZIO 2 (#453)
Browse files Browse the repository at this point in the history
* Upgrade to ZIO 2

* Fix 2.12

* Fix 2.12
  • Loading branch information
vigoo authored Oct 1, 2022
1 parent 1560ff2 commit fd5f543
Show file tree
Hide file tree
Showing 21 changed files with 391 additions and 354 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
fail-fast: false
matrix:
java: ['[email protected]', '[email protected]']
scala: ['2.12.15', '2.13.3']
scala: ['2.12.15', '2.13.8']
steps:
- uses: actions/[email protected]
- uses: olafurpg/setup-scala@v10
Expand Down
13 changes: 6 additions & 7 deletions actors/src/main/scala/zio/actors/Actor.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package zio.actors

import zio.actors.Actor.PendingMessage
import zio.clock.Clock
import zio.{ Supervisor => _, _ }

object Actor {
Expand Down Expand Up @@ -35,19 +34,19 @@ object Actor {
context: Context,
optOutActorSystem: () => Task[Unit],
mailboxSize: Int = DefaultActorMailboxSize
)(initial: S): URIO[R with Clock, Actor[F]] = {
)(initial: S): URIO[R, Actor[F]] = {

def process[A](msg: PendingMessage[F, A], state: Ref[S]): URIO[R with Clock, Unit] =
def process[A](msg: PendingMessage[F, A], state: Ref[S]): URIO[R, Unit] =
for {
s <- state.get
(fa, promise) = msg
receiver = receive(s, fa, context)
completer = ((s: S, a: A) => state.set(s) *> promise.succeed(a)).tupled
_ <- receiver.foldM(
_ <- receiver.foldZIO(
e =>
supervisor
.supervise(receiver, e)
.foldM(_ => promise.fail(e), completer),
.foldZIO(_ => promise.fail(e), completer),
completer
)
} yield ()
Expand All @@ -70,7 +69,7 @@ object Actor {
context: Context,
optOutActorSystem: () => Task[Unit],
mailboxSize: Int = DefaultActorMailboxSize
)(initial: S): RIO[R with Clock, Actor[F]]
)(initial: S): RIO[R, Actor[F]]

}

Expand Down Expand Up @@ -104,7 +103,7 @@ private[actors] final class Actor[-F[+_]](
this.stop
}

val stop: Task[List[_]] =
val stop: Task[Chunk[_]] =
for {
tall <- queue.takeAll
_ <- queue.shutdown
Expand Down
44 changes: 24 additions & 20 deletions actors/src/main/scala/zio/actors/ActorRef.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package zio.actors

import java.io.{ IOException, ObjectInputStream, ObjectOutputStream, ObjectStreamException }
import zio.nio.channels.AsynchronousSocketChannel
import zio.nio.{ InetAddress, InetSocketAddress }
import zio.{ Chunk, Runtime, Task, UIO, Unsafe, ZIO }

import zio.nio.core.{ InetAddress, InetSocketAddress, SocketAddress }
import zio.nio.core.channels.AsynchronousSocketChannel
import zio.{ IO, Runtime, Task, UIO }
import java.io.{ IOException, ObjectInputStream, ObjectOutputStream, ObjectStreamException }

/**
* Reference to actor that might reside on local JVM instance or be available via remote communication
Expand Down Expand Up @@ -41,7 +41,7 @@ sealed trait ActorRef[-F[+_]] extends Serializable {
/**
* Stops actor and all its children
*/
val stop: Task[List[_]]
val stop: Task[Chunk[_]]

}

Expand Down Expand Up @@ -73,13 +73,15 @@ private[actors] sealed abstract class ActorRefSerial[-F[+_]](private var actorPa
(_, addr, port, _) = resolved
address <- InetAddress
.byName(addr.value)
.flatMap(iAddr => SocketAddress.inetSocketAddress(iAddr, port.value))
.flatMap(iAddr => InetSocketAddress.inetAddress(iAddr, port.value))
} yield new ActorRefRemote[F](actorPath, address)

ActorRefSerial.runtimeForResolve.unsafeRun(remoteRef)
Unsafe.unsafe { implicit u =>
ActorRefSerial.runtimeForResolve.unsafe.run(remoteRef).getOrThrowFiberFailure()
}
}

override val path: UIO[String] = UIO(actorPath)
override val path: UIO[String] = ZIO.succeed(actorPath)
}

private[actors] final class ActorRefLocal[-F[+_]](
Expand All @@ -90,7 +92,7 @@ private[actors] final class ActorRefLocal[-F[+_]](

override def !(fa: F[_]): Task[Unit] = actor ! fa

override val stop: Task[List[_]] = actor.stop
override val stop: Task[Chunk[_]] = actor.stop

@throws[IOException]
private def writeObject(out: ObjectOutputStream): Unit =
Expand All @@ -115,19 +117,21 @@ private[actors] final class ActorRefRemote[-F[+_]](

override def !(fa: F[_]): Task[Unit] = sendEnvelope[Unit](Command.Tell(fa))

override val stop: Task[List[_]] = sendEnvelope(Command.Stop)
override val stop: Task[Chunk[_]] = sendEnvelope(Command.Stop)

private def sendEnvelope[A](command: Command): Task[A] =
for {
client <- AsynchronousSocketChannel()
response <- for {
_ <- client.connect(address)
actorPath <- path
_ <- writeToWire(client, new Envelope(command, actorPath))
response <- readFromWire(client)
} yield response.asInstanceOf[Either[Throwable, A]]
result <- IO.fromEither(response)
} yield result
ZIO.scoped {
for {
client <- AsynchronousSocketChannel.open
response <- for {
_ <- client.connect(address)
actorPath <- path
_ <- writeToWire(client, new Envelope(command, actorPath))
response <- readFromWire(client)
} yield response.asInstanceOf[Either[Throwable, A]]
result <- ZIO.fromEither(response)
} yield result
}

@throws[IOException]
private def writeObject(out: ObjectOutputStream): Unit =
Expand Down
Loading

0 comments on commit fd5f543

Please sign in to comment.