Skip to content

Commit

Permalink
fibers readme
Browse files Browse the repository at this point in the history
  • Loading branch information
fwbrasil committed Sep 17, 2023
1 parent b5f6817 commit 856a8de
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 15 deletions.
214 changes: 213 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ assert(triesFirst(Tries.get(Failure(ex))) == Some(Failure(ex)))

## Core Effects

Kyo's core effects act as the essential building blocks that power your application's various functionalities. Unlike other libraries that might require heavy boilerplate or specialized knowledge, Kyo's core effects are designed to be straightforward and flexible. These core effects not only simplify the management of side-effects, dependencies, and several other aspects but also allow for a modular approach to building maintainable systems.

### Aborts: Short Circuiting

The `Aborts` effect is a generic implementation for short-circuiting effects. It's equivalent to ZIO's failure channel.
Expand Down Expand Up @@ -667,7 +669,217 @@ val d: Unit > IOs =

> Important: The `Loggers` effect chooses to consider the initialization of a `Logger` instance as being pure even though it may perform side effects. `Logger` instances need to be stored in constant fields for good performance, something not trivial to achieve if `Loggers.init` required an `IOs` suspension.
### Lists:
## Concurrent Effects

The `kyo.concurrent` package provides utilities for dealing with concurrency in Scala applications. It's a powerful set of effects designed for easier asynchronous programming, built on top of other core functionalities provided by the `kyo` package.

### Fibers: Managed Green Threads

The `Fibers` effect can fork a computation to execute asynchronously in Kyo's managed thread pool. The `forkFiber` method is the fundamental building block of the effect's features. It takes a by-reference computation and properly propagates `Locals`.

```scala
import kyo.concurrent.fibers._

// Fork a computation
val a: Fiber[Int] > (Fibers with IOs) =
Fibers.forkFiber(Math.cos(42).toInt)

// It's possible to "extract" the value of a
// 'Fiber' via the 'get' method. This is also
// refered as "joining the fiber"
val b: Int > (Fibers with IOs) =
Fibers.get(a)

// Alternatively, the 'fork' method is a shorthand
// to fork the computation and join the fiber
val c: Int > (Fibers with IOs) =
Fibers.fork(Math.cos(42).toInt)

// The 'value' method provides a 'Fiber' instance
// fulfilled with the provided pure value
val d: Fiber[Int] =
Fibers.value(42)

// Use 'fail' to create a fiber that will fail
// with the provided exception
val e: Fiber[Int] =
Fibers.fail(new Exception)
```

The `parallel` methods fork multiple computations in parallel, join the fibers, and produce a tuple with their results.

```scala
// There are method overloadings for up to four
// parallel computations
val a: (Int, String) > (Fibers with IOs) =
Fibers.parallel(Math.cos(42).toInt, "example")

// Alternatively, it's possible to to provide
// a 'Seq' of computations and produce a 'Seq'
// with the results
val b: Seq[Int] > (Fibers with IOs) =
Fibers.parallel(Seq(Math.cos(42).toInt, Math.sin(42).toInt))

// The 'parallelFiber' method is similar but
// it doesn't automatically join the fibers and
// produces a 'Fiber[Seq[T]]'
val c: Fiber[Seq[Int]] > IOs =
Fibers.parallelFiber(Seq(Math.cos(42).toInt, Math.sin(42).toInt))
```

The `race` methods are similar to `parallel` but they return the first computation to complete with either a successful result or a failure. Once the first result is produced, the other computations are automatically interrupted.

```scala
// There are method overloadings for up to four
// computations
val a: Int > (Fibers with IOs) =
Fibers.race(Math.cos(42).toInt, Math.sin(42).toInt)

// It's also possible to to provide a 'Seq'
// of computations
val b: Int > (Fibers with IOs) =
Fibers.race(Seq(Math.cos(42).toInt, Math.sin(42).toInt))

// 'raceFiber' produces a 'Fiber' without
// joining it
val c: Fiber[Int] > IOs =
Fibers.raceFiber(Seq(Math.cos(42).toInt, Math.sin(42).toInt))
```

`Fibers` also provides `await` to wait for the completion of all computations and discard their results.

```scala
// Also up to four parallel computations
val a: Unit > (Fibers with IOs) =
Fibers.await(Math.cos(42).toInt, "srt")

// Unit a 'Seq'
val b: Unit > (Fibers with IOs) =
Fibers.await(Seq(Math.cos(42).toInt, Math.sin(42).toInt))

// Awaiting without joining
val c: Fiber[Unit] > IOs =
Fibers.awaitFiber(Seq(Math.cos(42).toInt, Math.sin(42).toInt))
```

The `sleep` and `timeout` methods combine the `Timers` effect to pause a computation or time it out after a duration.

```scala
import kyo.concurrent.timers._
import scala.concurrent.duration._

// A computation that sleeps for 1s
val a: Unit > (Fibers with IOs with Timers) =
Fibers.sleep(1.second)

// Times out and interrupts the provided
// computation in case it doesn't produce
// a result within 1s
val b: Int > (Fibers with IOs with Timers) =
Fibers.timeout(1.second)(Math.cos(42).toInt)
```

The `join` methods provide interoperability with Scala's `Future`.

```scala
import scala.concurrent.Future

// An example 'Future' instance
val a: Future[Int] = Future.successful(42)

// Join the result of a 'Future'
val b: Int > (Fibers with IOs) =
Fibers.join(a)

// Use 'joinFiber' to produce 'Fiber'
// instead of joining the computation
val c: Fiber[Int] > IOs =
Fibers.joinFiber(a)
```

> Important: Since Scala's `Future` doesn't provide an interruption mechanism, computations run to completion and don't get automatically interruped in `race` for example.
A `Fiber` instance also provides a few relevant methods.

```scala
// An example fiber
val a: Fiber[Int] = Fibers.value(42)

// Check if the fiber is done
val b: Boolean > IOs =
a.isDone

// Instance-level version of 'Fibers.get'
val c: Int > Fibers =
a.get

// Avoid this low-level API to attach a
// a callback to a fiber
val d: Unit > IOs =
a.onComplete(println(_))

// A variant of `get` that returns a `Try`
// with the failed or successful result
val e: Try[Int] > (Fibers with IOs) =
a.getTry

// Try to interrupt/cancel a fiber
val f: Boolean > IOs =
a.interrupt

// Try to interrupt a fiber and wait for
// its finalization
val g: Boolean > (Fibers with IOs) =
a.interruptAwait

// Transforms a fiber into a Scala 'Future'
val h: Future[Int] > IOs =
a.toFuture

// The 'transform' method is similar to `flatMap`
// in Scala's 'Future'
val i: Fiber[Int] > IOs =
a.transform(v => Fibers.value(v + 1))
```

Similarly to `IOs`, users should avoid handling the `Fibers` effect directly and rely on `kyo.App` instead. If strictly necessary, there are two methods to handle the `Fibers` effect:

1. `run` takes a computation that has only the `Fibers` effect pending and returns a `Fiber` instance without blocking threads.
2. `runBlocking` accepts computations with arbritary pending effects but it handles asynchronous operations by blocking the current thread.

```scala
// An example computation with fibers
val a: Int > (Fibers with IOs) =
Fibers.fork(Math.cos(42).toInt)

// Avoid handling 'Fibers' directly
// Note how the code has to handle the
// 'IOs' effect and then handle 'Fibers'
val b: Fiber[Int] > IOs =
Fibers.run(IOs.runLazy(a))

// The 'runBlocking' method accepts
// arbitrary pending effects but relies
// on thread blocking.
val c: Int > IOs =
Fibers.runBlocking(a)
```

> Note: Handling the `Fibers` effect doesn't break referential transparecy as with `IOs` but its usage is not trivial due to the limitations of the pending effects, especially `IOs`. Prefer `kyo.App` instead.
The `Fibers` effect also offers a low-level API to create `Promise`s as way to integrate external async operations with fibers. These APIs should be used only in low-level integration code.

```scala
// Initialize a promise
val a: Promise[Int] > IOs =
Fibers.promise[Int]

// Try to fulfill a promise
val b: Boolean > IOs =
a.map(_.complete(42))
```

> A `Promise` is basically a `Fiber` with all the regular functionality plus the `complete` method to manually fulfill the promise.
License
-------
Expand Down
30 changes: 16 additions & 14 deletions kyo-core/shared/src/main/scala/kyo/concurrent/fibers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,10 @@ object fibers {
(s(0).asInstanceOf[T1], s(1).asInstanceOf[T2], s(2).asInstanceOf[T3], s(3).asInstanceOf[T4])
)

def parallel[T](l: List[T > (IOs with Fibers)]): Seq[T] > (IOs with Fibers) =
def parallel[T](l: Seq[T > (IOs with Fibers)]): Seq[T] > (IOs with Fibers) =
Fibers.join(parallelFiber[T](l))

def parallelFiber[T](l: List[T > (IOs with Fibers)]): Fiber[Seq[T]] > IOs =
def parallelFiber[T](l: Seq[T > (IOs with Fibers)]): Fiber[Seq[T]] > IOs =
Locals.save.map { st =>
IOs {
val p = new IOPromise[Seq[T]]
Expand Down Expand Up @@ -313,32 +313,27 @@ object fibers {
v1: => T > (IOs with Fibers),
v2: => T > (IOs with Fibers)
): T > (IOs with Fibers) =
Fibers.join(raceFiber(List(IOs(v1), IOs(v2))))
race(List(IOs(v1), IOs(v2)))

def race[T](
v1: => T > (IOs with Fibers),
v2: => T > (IOs with Fibers),
v3: => T > (IOs with Fibers)
): T > (IOs with Fibers) =
Fibers.join(raceFiber(List(IOs(v1), IOs(v2), IOs(v2))))
race(List(IOs(v1), IOs(v2), IOs(v2)))

def race[T](
v1: => T > (IOs with Fibers),
v2: => T > (IOs with Fibers),
v3: => T > (IOs with Fibers),
v4: => T > (IOs with Fibers)
): T > (IOs with Fibers) =
Fibers.join(raceFiber(List(IOs(v1), IOs(v2), IOs(v2), IOs(v4))))
race(List(IOs(v1), IOs(v2), IOs(v2), IOs(v4)))

private def foreach[T, U](l: List[T])(f: T => Unit): Unit = {
var curr = l
while (curr ne Nil) {
f(curr.head)
curr = curr.tail
}
}
def race[T](l: Seq[T > (IOs with Fibers)]): T > (IOs with Fibers) =
Fibers.join(raceFiber[T](l))

def raceFiber[T](l: List[T > (IOs with Fibers)]): Fiber[T] > IOs = {
def raceFiber[T](l: Seq[T > (IOs with Fibers)]): Fiber[T] > IOs = {
require(!l.isEmpty)
Locals.save.map { st =>
IOs {
Expand Down Expand Up @@ -379,7 +374,7 @@ object fibers {
): Unit > (IOs with Fibers) =
Fibers.join(awaitFiber(List(IOs(v1), IOs(v2), IOs(v2), IOs(v4))))

def awaitFiber[T](l: List[T > (IOs with Fibers)]): Fiber[Unit] > IOs =
def awaitFiber[T](l: Seq[T > (IOs with Fibers)]): Fiber[Unit] > IOs =
Locals.save.map { st =>
IOs {
val p = new IOPromise[Unit]
Expand Down Expand Up @@ -460,6 +455,13 @@ object fibers {
}
}
}

private def foreach[T, U](l: Seq[T])(f: T => Unit): Unit = {
val it = l.iterator
while (it.hasNext) {
f(it.next())
}
}
}
val Fibers = new Fibers

Expand Down

0 comments on commit 856a8de

Please sign in to comment.