diff --git a/README.md b/README.md index a808486ff..b4389c637 100644 --- a/README.md +++ b/README.md @@ -188,6 +188,8 @@ assert(triesFirst(Tries.get(Failure(ex))) == Some(Failure(ex))) ## Core Effects +Kyo's core effects act as the essential building blocks that power your application's various functionalities. Unlike other libraries that might require heavy boilerplate or specialized knowledge, Kyo's core effects are designed to be straightforward and flexible. These core effects not only simplify the management of side-effects, dependencies, and several other aspects but also allow for a modular approach to building maintainable systems. + ### Aborts: Short Circuiting The `Aborts` effect is a generic implementation for short-circuiting effects. It's equivalent to ZIO's failure channel. @@ -667,7 +669,217 @@ val d: Unit > IOs = > Important: The `Loggers` effect chooses to consider the initialization of a `Logger` instance as being pure even though it may perform side effects. `Logger` instances need to be stored in constant fields for good performance, something not trivial to achieve if `Loggers.init` required an `IOs` suspension. -### Lists: +## Concurrent Effects + +The `kyo.concurrent` package provides utilities for dealing with concurrency in Scala applications. It's a powerful set of effects designed for easier asynchronous programming, built on top of other core functionalities provided by the `kyo` package. + +### Fibers: Managed Green Threads + +The `Fibers` effect can fork a computation to execute asynchronously in Kyo's managed thread pool. The `forkFiber` method is the fundamental building block of the effect's features. It takes a by-reference computation and properly propagates `Locals`. + +```scala +import kyo.concurrent.fibers._ + +// Fork a computation +val a: Fiber[Int] > (Fibers with IOs) = + Fibers.forkFiber(Math.cos(42).toInt) + +// It's possible to "extract" the value of a +// 'Fiber' via the 'get' method. This is also +// refered as "joining the fiber" +val b: Int > (Fibers with IOs) = + Fibers.get(a) + +// Alternatively, the 'fork' method is a shorthand +// to fork the computation and join the fiber +val c: Int > (Fibers with IOs) = + Fibers.fork(Math.cos(42).toInt) + +// The 'value' method provides a 'Fiber' instance +// fulfilled with the provided pure value +val d: Fiber[Int] = + Fibers.value(42) + +// Use 'fail' to create a fiber that will fail +// with the provided exception +val e: Fiber[Int] = + Fibers.fail(new Exception) +``` + +The `parallel` methods fork multiple computations in parallel, join the fibers, and produce a tuple with their results. + +```scala +// There are method overloadings for up to four +// parallel computations +val a: (Int, String) > (Fibers with IOs) = + Fibers.parallel(Math.cos(42).toInt, "example") + +// Alternatively, it's possible to to provide +// a 'Seq' of computations and produce a 'Seq' +// with the results +val b: Seq[Int] > (Fibers with IOs) = + Fibers.parallel(Seq(Math.cos(42).toInt, Math.sin(42).toInt)) + +// The 'parallelFiber' method is similar but +// it doesn't automatically join the fibers and +// produces a 'Fiber[Seq[T]]' +val c: Fiber[Seq[Int]] > IOs = + Fibers.parallelFiber(Seq(Math.cos(42).toInt, Math.sin(42).toInt)) +``` + +The `race` methods are similar to `parallel` but they return the first computation to complete with either a successful result or a failure. Once the first result is produced, the other computations are automatically interrupted. + +```scala +// There are method overloadings for up to four +// computations +val a: Int > (Fibers with IOs) = + Fibers.race(Math.cos(42).toInt, Math.sin(42).toInt) + +// It's also possible to to provide a 'Seq' +// of computations +val b: Int > (Fibers with IOs) = + Fibers.race(Seq(Math.cos(42).toInt, Math.sin(42).toInt)) + +// 'raceFiber' produces a 'Fiber' without +// joining it +val c: Fiber[Int] > IOs = + Fibers.raceFiber(Seq(Math.cos(42).toInt, Math.sin(42).toInt)) +``` + +`Fibers` also provides `await` to wait for the completion of all computations and discard their results. + +```scala +// Also up to four parallel computations +val a: Unit > (Fibers with IOs) = + Fibers.await(Math.cos(42).toInt, "srt") + +// Unit a 'Seq' +val b: Unit > (Fibers with IOs) = + Fibers.await(Seq(Math.cos(42).toInt, Math.sin(42).toInt)) + +// Awaiting without joining +val c: Fiber[Unit] > IOs = + Fibers.awaitFiber(Seq(Math.cos(42).toInt, Math.sin(42).toInt)) +``` + +The `sleep` and `timeout` methods combine the `Timers` effect to pause a computation or time it out after a duration. + +```scala +import kyo.concurrent.timers._ +import scala.concurrent.duration._ + +// A computation that sleeps for 1s +val a: Unit > (Fibers with IOs with Timers) = + Fibers.sleep(1.second) + +// Times out and interrupts the provided +// computation in case it doesn't produce +// a result within 1s +val b: Int > (Fibers with IOs with Timers) = + Fibers.timeout(1.second)(Math.cos(42).toInt) +``` + +The `join` methods provide interoperability with Scala's `Future`. + +```scala +import scala.concurrent.Future + +// An example 'Future' instance +val a: Future[Int] = Future.successful(42) + +// Join the result of a 'Future' +val b: Int > (Fibers with IOs) = + Fibers.join(a) + +// Use 'joinFiber' to produce 'Fiber' +// instead of joining the computation +val c: Fiber[Int] > IOs = + Fibers.joinFiber(a) +``` + +> Important: Since Scala's `Future` doesn't provide an interruption mechanism, computations run to completion and don't get automatically interruped in `race` for example. + +A `Fiber` instance also provides a few relevant methods. + +```scala +// An example fiber +val a: Fiber[Int] = Fibers.value(42) + +// Check if the fiber is done +val b: Boolean > IOs = + a.isDone + +// Instance-level version of 'Fibers.get' +val c: Int > Fibers = + a.get + +// Avoid this low-level API to attach a +// a callback to a fiber +val d: Unit > IOs = + a.onComplete(println(_)) + +// A variant of `get` that returns a `Try` +// with the failed or successful result +val e: Try[Int] > (Fibers with IOs) = + a.getTry + +// Try to interrupt/cancel a fiber +val f: Boolean > IOs = + a.interrupt + +// Try to interrupt a fiber and wait for +// its finalization +val g: Boolean > (Fibers with IOs) = + a.interruptAwait + +// Transforms a fiber into a Scala 'Future' +val h: Future[Int] > IOs = + a.toFuture + +// The 'transform' method is similar to `flatMap` +// in Scala's 'Future' +val i: Fiber[Int] > IOs = + a.transform(v => Fibers.value(v + 1)) +``` + +Similarly to `IOs`, users should avoid handling the `Fibers` effect directly and rely on `kyo.App` instead. If strictly necessary, there are two methods to handle the `Fibers` effect: + +1. `run` takes a computation that has only the `Fibers` effect pending and returns a `Fiber` instance without blocking threads. +2. `runBlocking` accepts computations with arbritary pending effects but it handles asynchronous operations by blocking the current thread. + +```scala +// An example computation with fibers +val a: Int > (Fibers with IOs) = + Fibers.fork(Math.cos(42).toInt) + +// Avoid handling 'Fibers' directly +// Note how the code has to handle the +// 'IOs' effect and then handle 'Fibers' +val b: Fiber[Int] > IOs = + Fibers.run(IOs.runLazy(a)) + +// The 'runBlocking' method accepts +// arbitrary pending effects but relies +// on thread blocking. +val c: Int > IOs = + Fibers.runBlocking(a) +``` + +> Note: Handling the `Fibers` effect doesn't break referential transparecy as with `IOs` but its usage is not trivial due to the limitations of the pending effects, especially `IOs`. Prefer `kyo.App` instead. + +The `Fibers` effect also offers a low-level API to create `Promise`s as way to integrate external async operations with fibers. These APIs should be used only in low-level integration code. + +```scala +// Initialize a promise +val a: Promise[Int] > IOs = + Fibers.promise[Int] + +// Try to fulfill a promise +val b: Boolean > IOs = + a.map(_.complete(42)) +``` + +> A `Promise` is basically a `Fiber` with all the regular functionality plus the `complete` method to manually fulfill the promise. License ------- diff --git a/kyo-core/shared/src/main/scala/kyo/concurrent/fibers.scala b/kyo-core/shared/src/main/scala/kyo/concurrent/fibers.scala index 3ed417722..4742b7a04 100644 --- a/kyo-core/shared/src/main/scala/kyo/concurrent/fibers.scala +++ b/kyo-core/shared/src/main/scala/kyo/concurrent/fibers.scala @@ -277,10 +277,10 @@ object fibers { (s(0).asInstanceOf[T1], s(1).asInstanceOf[T2], s(2).asInstanceOf[T3], s(3).asInstanceOf[T4]) ) - def parallel[T](l: List[T > (IOs with Fibers)]): Seq[T] > (IOs with Fibers) = + def parallel[T](l: Seq[T > (IOs with Fibers)]): Seq[T] > (IOs with Fibers) = Fibers.join(parallelFiber[T](l)) - def parallelFiber[T](l: List[T > (IOs with Fibers)]): Fiber[Seq[T]] > IOs = + def parallelFiber[T](l: Seq[T > (IOs with Fibers)]): Fiber[Seq[T]] > IOs = Locals.save.map { st => IOs { val p = new IOPromise[Seq[T]] @@ -313,14 +313,14 @@ object fibers { v1: => T > (IOs with Fibers), v2: => T > (IOs with Fibers) ): T > (IOs with Fibers) = - Fibers.join(raceFiber(List(IOs(v1), IOs(v2)))) + race(List(IOs(v1), IOs(v2))) def race[T]( v1: => T > (IOs with Fibers), v2: => T > (IOs with Fibers), v3: => T > (IOs with Fibers) ): T > (IOs with Fibers) = - Fibers.join(raceFiber(List(IOs(v1), IOs(v2), IOs(v2)))) + race(List(IOs(v1), IOs(v2), IOs(v2))) def race[T]( v1: => T > (IOs with Fibers), @@ -328,17 +328,12 @@ object fibers { v3: => T > (IOs with Fibers), v4: => T > (IOs with Fibers) ): T > (IOs with Fibers) = - Fibers.join(raceFiber(List(IOs(v1), IOs(v2), IOs(v2), IOs(v4)))) + race(List(IOs(v1), IOs(v2), IOs(v2), IOs(v4))) - private def foreach[T, U](l: List[T])(f: T => Unit): Unit = { - var curr = l - while (curr ne Nil) { - f(curr.head) - curr = curr.tail - } - } + def race[T](l: Seq[T > (IOs with Fibers)]): T > (IOs with Fibers) = + Fibers.join(raceFiber[T](l)) - def raceFiber[T](l: List[T > (IOs with Fibers)]): Fiber[T] > IOs = { + def raceFiber[T](l: Seq[T > (IOs with Fibers)]): Fiber[T] > IOs = { require(!l.isEmpty) Locals.save.map { st => IOs { @@ -379,7 +374,7 @@ object fibers { ): Unit > (IOs with Fibers) = Fibers.join(awaitFiber(List(IOs(v1), IOs(v2), IOs(v2), IOs(v4)))) - def awaitFiber[T](l: List[T > (IOs with Fibers)]): Fiber[Unit] > IOs = + def awaitFiber[T](l: Seq[T > (IOs with Fibers)]): Fiber[Unit] > IOs = Locals.save.map { st => IOs { val p = new IOPromise[Unit] @@ -460,6 +455,13 @@ object fibers { } } } + + private def foreach[T, U](l: Seq[T])(f: T => Unit): Unit = { + val it = l.iterator + while (it.hasNext) { + f(it.next()) + } + } } val Fibers = new Fibers