Skip to content

Commit

Permalink
cleanup fibers api + adopt method name conventions
Browse files Browse the repository at this point in the history
  • Loading branch information
fwbrasil committed Nov 28, 2023
1 parent 6b27988 commit 30b1882
Show file tree
Hide file tree
Showing 24 changed files with 90 additions and 101 deletions.
33 changes: 14 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -815,27 +815,22 @@ The `kyo.concurrent` package provides utilities for dealing with concurrency in

### Fibers: Green Threads

The `Fibers` effect allows for the asynchronous execution of computations via a managed thread pool. The core function, `forkFiber`, spawns a new "green thread," also known as a fiber, to handle the given computation. This provides a powerful mechanism for parallel execution and efficient use of system resources. Moreover, fibers maintain proper propagation of `Locals`, ensuring that context information is carried along during the forking process.
The `Fibers` effect allows for the asynchronous execution of computations via a managed thread pool. The core function, `init`, spawns a new "green thread," also known as a fiber, to handle the given computation. This provides a powerful mechanism for parallel execution and efficient use of system resources. Moreover, fibers maintain proper propagation of `Locals`, ensuring that context information is carried along during the forking process.

```scala
import kyo.concurrent.fibers._

// Fork a computation. The parameter is
// taken by reference and automatically
// suspended with 'IOs'
val a: Fiber[Int] > Fibers =
Fibers.fork(Math.cos(42).toInt)
val a: Fiber[Int] > IOs =
Fibers.init(Math.cos(42).toInt)

// It's possible to "extract" the value of a
// 'Fiber' via the 'get' method. This is also
// referred as "joining the fiber"
val b: Int > Fibers =
a.map(_.get)

// The 'value' method provides a 'Fiber' instance
// fulfilled with the provided pure value
val d: Fiber[Int] =
Fibers.value(42)
Fibers.get(a)
```

The `parallel` methods fork multiple computations in parallel, join the fibers, and return their results.
Expand Down Expand Up @@ -887,7 +882,7 @@ val d: Fiber[Int] > IOs =
Fibers.raceFiber(Seq(a, a.map(_ + 1)))
```

The `sleep` and `timeout` methods combine the `Timers` effect to pause a computation or time it out after a duration.
The `sleep` and `timeout` methods pause a computation or time it out after a duration.

```scala
import kyo.concurrent.timers._
Expand All @@ -904,7 +899,7 @@ val b: Int > Fibers =
Fibers.timeout(1.second)(Math.cos(42).toInt)
```

The `join` methods provide interoperability with Scala's `Future`.
The `fromFuture` methods provide interoperability with Scala's `Future`.

```scala
import scala.concurrent.Future
Expand All @@ -914,12 +909,12 @@ val a: Future[Int] = Future.successful(42)

// Join the result of a 'Future'
val b: Int > Fibers =
Fibers.join(a)
Fibers.fromFuture(a)

// Use 'joinFiber' to produce 'Fiber'
// Use 'fromFutureFiber' to produce 'Fiber'
// instead of joining the computation
val c: Fiber[Int] > IOs =
Fibers.joinFiber(a)
Fibers.fromFutureFiber(a)
```

> Important: Keep in mind that Scala's Future lacks built-in support for interruption. As a result, any computations executed through Future will run to completion, even if they're involved in a race operation where another computation finishes first.
Expand Down Expand Up @@ -970,7 +965,7 @@ Similarly to `IOs`, users should avoid handling the `Fibers` effect directly and
```scala
// An example computation with fibers
val a: Int > Fibers =
Fibers.fork(Math.cos(42).toInt).map(_.get)
Fibers.init(Math.cos(42).toInt).map(_.get)

// Avoid handling 'Fibers' directly
// Note how the code has to handle the
Expand Down Expand Up @@ -1547,26 +1542,26 @@ def test[T](v: T > Options)(implicit f: Flat[T]) =
Options.run(v)
```

All APIs that trigger effect handling have this restriction, which includes not only methods that handle effects directly but also methods that use effect handling internally. For example, `Fibers.fork` handles effects internally and doesn't allow nested effects.
All APIs that trigger effect handling have this restriction, which includes not only methods that handle effects directly but also methods that use effect handling internally. For example, `Fibers.init` handles effects internally and doesn't allow nested effects.

```scala
// An example nested computation
val a: Int > IOs > IOs =
IOs(IOs(1))

// Fails to compile:
// Fibers.fork(a)
// Fibers.init(a)
```

The compile-time checking mechanism can also be triggered in scenarios where Scala's type inference artificially introduces nesting due to a mismatch between the effects suported by a method and the provided input. In this scenario, the error message contains an additional observation regarding this possibility. For example, `Fibers.fork` only accepts computations with `Fibers` pending.
The compile-time checking mechanism can also be triggered in scenarios where Scala's type inference artificially introduces nesting due to a mismatch between the effects suported by a method and the provided input. In this scenario, the error message contains an additional observation regarding this possibility. For example, `Fibers.init` only accepts computations with `Fibers` pending.

```scala
// Example computation with a
// mismatching effect (Options)
val a: Int > Options =
Options.get(Some(1))

// Fibers.fork(a)
// Fibers.init(a)
// Compilation failure:
// Method doesn't accept nested Kyo computations.
// Detected: 'scala.Int > kyo.options.Options > kyo.concurrent.fibers.Fibers'. Consider using 'flatten' to resolve.
Expand Down
2 changes: 1 addition & 1 deletion kyo-bench/src/main/scala/kyo/bench/Bench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object Bench {

abstract class Fork[T](implicit f: Flat[T]) extends Bench[T] {
@Benchmark
def forkKyo(): T = IOs.run(Fibers.fork(kyoBenchFiber()).flatMap(_.block))
def forkKyo(): T = IOs.run(Fibers.init(kyoBenchFiber()).flatMap(_.block))

@Benchmark
def forkCats(): T = IO.cede.flatMap(_ => catsBench()).unsafeRunSync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class CountdownLatchBench extends Bench.ForkOnly[Int] {

for {
l <- Latches.init(depth)
_ <- Fibers.fork(iterate(l, depth))
_ <- Fibers.init(iterate(l, depth))
_ <- l.await
} yield 0
}
Expand Down
4 changes: 2 additions & 2 deletions kyo-bench/src/main/scala/kyo/bench/ForkChainedBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ class ForkChainedBench extends Bench.ForkOnly[Int] {

def iterate(p: Promise[Unit], n: Int): Unit > IOs =
if (n <= 0) p.complete(()).unit
else IOs.unit.flatMap(_ => Fibers.fork(iterate(p, n - 1)).unit)
else IOs.unit.flatMap(_ => Fibers.init(iterate(p, n - 1)).unit)

for {
p <- Fibers.initPromise[Unit]
_ <- Fibers.fork(iterate(p, depth))
_ <- Fibers.init(iterate(p, depth))
_ <- p.get
} yield 0
}
Expand Down
2 changes: 1 addition & 1 deletion kyo-bench/src/main/scala/kyo/bench/ForkManyBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ForkManyBench extends Bench.ForkOnly[Int] {
case _ =>
false
}
_ <- repeat(depth)(Fibers.fork(effect))
_ <- repeat(depth)(Fibers.init(effect))
_ <- promise.get
} yield 0
}
Expand Down
2 changes: 1 addition & 1 deletion kyo-bench/src/main/scala/kyo/bench/ForkSpawnBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ForkSpawnBench extends Bench.ForkOnly[Unit] {
if (level == depth) {
cdl.release
} else {
repeat(width)(Fibers.fork(loop(cdl, level + 1)).map(_ => ()))
repeat(width)(Fibers.init(loop(cdl, level + 1)).map(_ => ()))
}

for {
Expand Down
6 changes: 3 additions & 3 deletions kyo-bench/src/main/scala/kyo/bench/PingPongBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ class PingPongBench extends Bench.ForkOnly[Unit] {
chan <- Channels.init[Unit](1)
effect =
for {
_ <- Fibers.fork(chan.put(()))
_ <- Fibers.init(chan.put(()))
_ <- chan.take
n <- ref.decrementAndGet
_ <- if (n == 0) promise.complete(()).unit else IOs.unit
} yield ()
_ <- repeat(depth)(Fibers.fork(effect))
_ <- repeat(depth)(Fibers.init(effect))
} yield ()

for {
promise <- Fibers.initPromise[Unit]
_ <- Fibers.fork(iterate(promise, depth))
_ <- Fibers.init(iterate(promise, depth))
_ <- promise.get
} yield ()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class ProducerConsumerBench extends Bench.ForkOnly[Unit] {

Channels.init[Unit](depth / 2, Access.Spsc).flatMap { q =>
for {
producer <- Fibers.fork(repeat(depth)(q.put(())))
consumer <- Fibers.fork(repeat(depth)(q.take))
producer <- Fibers.init(repeat(depth)(q.put(())))
consumer <- Fibers.init(repeat(depth)(q.take))
_ <- producer.get
_ <- consumer.get
} yield {}
Expand Down
4 changes: 2 additions & 2 deletions kyo-bench/src/main/scala/kyo/bench/RendezvousBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ class RendezvousBench extends Bench.ForkOnly[Int] {

for {
waiting <- Atomics.initRef[Any](null)
_ <- Fibers.fork(produce(waiting))
consumer <- Fibers.fork(consume(waiting))
_ <- Fibers.init(produce(waiting))
consumer <- Fibers.init(consume(waiting))
res <- consumer.get
} yield res
}
Expand Down
2 changes: 1 addition & 1 deletion kyo-bench/src/main/scala/kyo/bench/SchedulingBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class SchedulingBench extends Bench.ForkOnly[Int] {
}

Lists.traverse(range) { i =>
Fibers.fork(fiber(i))
Fibers.init(fiber(i))
}.map { fibers =>
Lists.traverse(fibers)(_.get)
}.map(_.sum)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class SemaphoreContentionBench extends Bench.ForkOnly[Unit] {
for {
sem <- Meters.initSemaphore(permits)
cdl <- Latches.init(fibers)
_ <- repeat(fibers)(Fibers.fork(loop(sem, cdl)))
_ <- repeat(fibers)(Fibers.init(loop(sem, cdl)))
_ <- cdl.await
} yield {}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class cachesTest extends KyoTest {
for {
c <- Caches.init(_.maxSize(4))
m = c.memo { (v: Int) =>
Fibers.fork[Int] {
Fibers.init[Int] {
calls += 1
v + 1
}.map(_.get)
Expand All @@ -44,7 +44,7 @@ class cachesTest extends KyoTest {
for {
c <- Caches.init(_.maxSize(4))
m = c.memo { (v: Int) =>
Fibers.fork[Int] {
Fibers.init[Int] {
calls += 1
if (calls == 1)
IOs.fail(ex)
Expand Down
6 changes: 3 additions & 3 deletions kyo-core/jvm/src/test/scala/kyoTest/MonadLawsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ object MonadLawsTest extends ZIOSpecDefault {
Gen.oneOf(
gen.map(v => (v: A > Fibers)),
gen.map(v => IOs(v)),
gen.map(v => Fibers.fork(v).map(_.get)),
gen.map(v => IOs(Fibers.fork(v).map(_.get))),
gen.map(v => Fibers.fork(IOs(v)).map(_.get))
gen.map(v => Fibers.init(v).map(_.get)),
gen.map(v => IOs(Fibers.init(v).map(_.get))),
gen.map(v => Fibers.init(IOs(v)).map(_.get))
).map(Myo(_))
}

Expand Down
2 changes: 1 addition & 1 deletion kyo-core/shared/src/main/scala/kyo/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object App {
def v3: Try[T] > Fibers = Tries.run(v2)
def v4: Try[T] > Fibers = Fibers.timeout(timeout)(v3)
def v5: Try[T] > Fibers = Tries.run(v4).map(_.flatten)
def v6: Try[T] > Fibers = Fibers.fork(v5).map(_.get)
def v6: Try[T] > Fibers = Fibers.init(v5).map(_.get)
def v7: Fiber[Try[T]] > IOs = Fibers.run(v6)
IOs.run(v7)
}
Expand Down
28 changes: 11 additions & 17 deletions kyo-core/shared/src/main/scala/kyo/concurrent/fibers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ object fibers {
def get: T > Fibers =
state match {
case promise: IOPromise[_] =>
Fibers.join(state)
FiberGets(state)
case failed: Failed =>
Fibers.join(state)
FiberGets(state)
case _ =>
state.asInstanceOf[T > Fibers]
}
Expand All @@ -85,7 +85,7 @@ object fibers {
promise.onComplete { t =>
p.complete(Try(IOs.run(t)))
}
Fibers.join(Fiber.promise(p))
Fibers.get(Fiber.promise(p))
}
case Failed(ex) =>
Failure(ex)
Expand Down Expand Up @@ -157,7 +157,7 @@ object fibers {
try t(state.asInstanceOf[T])
catch {
case ex if (NonFatal(ex)) =>
Fibers.fail(ex)
Fiber.failed(ex)
}
}
}
Expand Down Expand Up @@ -188,12 +188,6 @@ object fibers {
def get[T, S](v: Fiber[T] > S): T > (Fibers with S) =
v.map(_.get)

private[fibers] def join[T, S](v: Fiber[T] > S): T > (Fibers with S) =
FiberGets(v)

def fail[T](ex: Throwable): Fiber[T] =
Fiber.failed(ex)

private val _promise = IOs(unsafeInitPromise[Object])

def initPromise[T]: Promise[T] > IOs =
Expand All @@ -206,15 +200,15 @@ object fibers {
private val IOTask = kyo.concurrent.scheduler.IOTask

/*inline*/
def fork[T]( /*inline*/ v: => T > Fibers)(implicit f: Flat[T > Fibers]): Fiber[T] > IOs =
def init[T]( /*inline*/ v: => T > Fibers)(implicit f: Flat[T > Fibers]): Fiber[T] > IOs =
Locals.save.map(st => Fiber.promise(IOTask(IOs(v), st)))

def parallel[T](l: Seq[T > Fibers])(implicit f: Flat[T > Fibers]): Seq[T] > Fibers =
l.size match {
case 0 => Seq.empty
case 1 => l(0).map(Seq(_))
case _ =>
Fibers.join(parallelFiber[T](l))
Fibers.get(parallelFiber[T](l))
}

def parallelFiber[T](l: Seq[T > Fibers])(implicit f: Flat[T > Fibers]): Fiber[Seq[T]] > IOs =
Expand Down Expand Up @@ -256,7 +250,7 @@ object fibers {
case 0 => IOs.fail("Can't race an empty list.")
case 1 => l(0)
case _ =>
Fibers.join(raceFiber[T](l))
Fibers.get(raceFiber[T](l))
}

def raceFiber[T](l: Seq[T > Fibers])(implicit f: Flat[T > Fibers]): Fiber[T] > IOs =
Expand Down Expand Up @@ -300,7 +294,7 @@ object fibers {
}

def timeout[T](d: Duration)(v: => T > Fibers)(implicit f: Flat[T > Fibers]): T > Fibers =
fork(v).map { f =>
init(v).map { f =>
val timeout: Unit > IOs =
IOs {
IOTask(IOs(f.interrupt), Locals.State.empty)
Expand All @@ -311,10 +305,10 @@ object fibers {
}
}

def join[T, S](f: Future[T]): T > Fibers =
Fibers.join(joinFiber(f))
def fromFuture[T, S](f: Future[T]): T > Fibers =
Fibers.get(fromFutureFiber(f))

def joinFiber[T](f: Future[T]): Fiber[T] > IOs = {
def fromFutureFiber[T](f: Future[T]): Fiber[T] > IOs = {
Locals.save.map { st =>
IOs {
val p = new IOPromise[T]()
Expand Down
2 changes: 1 addition & 1 deletion kyo-core/shared/src/main/scala/kyo/concurrent/hubs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object hubs {
Channels.init[T](capacity).map { ch =>
IOs {
val listeners = new CopyOnWriteArraySet[Channel[T]]
Fibers.fork {
Fibers.init {
def loop(): Unit > Fibers =
ch.take.map { v =>
IOs {
Expand Down
2 changes: 1 addition & 1 deletion kyo-core/shared/src/test/scala/kyoTest/KyoAppTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class AppTest extends KyoTest {
_ <- Consoles.println("1")
_ <- Clocks.now
_ <- Resources.ensure(())
_ <- Fibers.fork(())
_ <- Fibers.init(())
} yield ()
}
app.main(Array())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ class channelsTest extends KyoTest {
b <- c.offer(1)
put <- c.putFiber(2)
f <- c.isFull
take1 <- Fibers.fork(c.takeFiber)
take2 <- Fibers.fork(c.takeFiber)
take1 <- Fibers.init(c.takeFiber)
take2 <- Fibers.init(c.takeFiber)
v1 <- take1.get
_ <- put.get
v2 <- take1.get
Expand Down
Loading

0 comments on commit 30b1882

Please sign in to comment.