From 30b18823b429bd5430e056979c7cccf524fcbcca Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Mon, 27 Nov 2023 22:25:39 -0800 Subject: [PATCH] cleanup fibers api + adopt method name conventions --- README.md | 33 ++++++++----------- .../src/main/scala/kyo/bench/Bench.scala | 2 +- .../scala/kyo/bench/CountdownLatchBench.scala | 2 +- .../scala/kyo/bench/ForkChainedBench.scala | 4 +-- .../main/scala/kyo/bench/ForkManyBench.scala | 2 +- .../main/scala/kyo/bench/ForkSpawnBench.scala | 2 +- .../main/scala/kyo/bench/PingPongBench.scala | 6 ++-- .../kyo/bench/ProducerConsumerBench.scala | 4 +-- .../scala/kyo/bench/RendezvousBench.scala | 4 +-- .../scala/kyo/bench/SchedulingBench.scala | 2 +- .../kyo/bench/SemaphoreContentionBench.scala | 2 +- .../scala/kyo/concurrent/cachesTest.scala | 4 +-- .../test/scala/kyoTest/MonadLawsTest.scala | 6 ++-- kyo-core/shared/src/main/scala/kyo/App.scala | 2 +- .../main/scala/kyo/concurrent/fibers.scala | 28 +++++++--------- .../src/main/scala/kyo/concurrent/hubs.scala | 2 +- .../src/test/scala/kyoTest/KyoAppTest.scala | 2 +- .../kyoTest/concurrent/channelsTest.scala | 4 +-- .../scala/kyoTest/concurrent/fibersTest.scala | 32 +++++++++--------- .../scala/kyoTest/concurrent/hubsTest.scala | 12 +++---- .../kyoTest/concurrent/latchesTest.scala | 2 +- .../scala/kyoTest/concurrent/metersTest.scala | 28 ++++++++-------- .../src/main/scala/kyo/PlatformBackend.scala | 2 +- .../scala/kyo/server/NettyKyoServer.scala | 4 +-- 24 files changed, 90 insertions(+), 101 deletions(-) diff --git a/README.md b/README.md index 6b02fc295..f9636f1f6 100644 --- a/README.md +++ b/README.md @@ -815,7 +815,7 @@ The `kyo.concurrent` package provides utilities for dealing with concurrency in ### Fibers: Green Threads -The `Fibers` effect allows for the asynchronous execution of computations via a managed thread pool. The core function, `forkFiber`, spawns a new "green thread," also known as a fiber, to handle the given computation. This provides a powerful mechanism for parallel execution and efficient use of system resources. Moreover, fibers maintain proper propagation of `Locals`, ensuring that context information is carried along during the forking process. +The `Fibers` effect allows for the asynchronous execution of computations via a managed thread pool. The core function, `init`, spawns a new "green thread," also known as a fiber, to handle the given computation. This provides a powerful mechanism for parallel execution and efficient use of system resources. Moreover, fibers maintain proper propagation of `Locals`, ensuring that context information is carried along during the forking process. ```scala import kyo.concurrent.fibers._ @@ -823,19 +823,14 @@ import kyo.concurrent.fibers._ // Fork a computation. The parameter is // taken by reference and automatically // suspended with 'IOs' -val a: Fiber[Int] > Fibers = - Fibers.fork(Math.cos(42).toInt) +val a: Fiber[Int] > IOs = + Fibers.init(Math.cos(42).toInt) // It's possible to "extract" the value of a // 'Fiber' via the 'get' method. This is also // referred as "joining the fiber" val b: Int > Fibers = - a.map(_.get) - -// The 'value' method provides a 'Fiber' instance -// fulfilled with the provided pure value -val d: Fiber[Int] = - Fibers.value(42) + Fibers.get(a) ``` The `parallel` methods fork multiple computations in parallel, join the fibers, and return their results. @@ -887,7 +882,7 @@ val d: Fiber[Int] > IOs = Fibers.raceFiber(Seq(a, a.map(_ + 1))) ``` -The `sleep` and `timeout` methods combine the `Timers` effect to pause a computation or time it out after a duration. +The `sleep` and `timeout` methods pause a computation or time it out after a duration. ```scala import kyo.concurrent.timers._ @@ -904,7 +899,7 @@ val b: Int > Fibers = Fibers.timeout(1.second)(Math.cos(42).toInt) ``` -The `join` methods provide interoperability with Scala's `Future`. +The `fromFuture` methods provide interoperability with Scala's `Future`. ```scala import scala.concurrent.Future @@ -914,12 +909,12 @@ val a: Future[Int] = Future.successful(42) // Join the result of a 'Future' val b: Int > Fibers = - Fibers.join(a) + Fibers.fromFuture(a) -// Use 'joinFiber' to produce 'Fiber' +// Use 'fromFutureFiber' to produce 'Fiber' // instead of joining the computation val c: Fiber[Int] > IOs = - Fibers.joinFiber(a) + Fibers.fromFutureFiber(a) ``` > Important: Keep in mind that Scala's Future lacks built-in support for interruption. As a result, any computations executed through Future will run to completion, even if they're involved in a race operation where another computation finishes first. @@ -970,7 +965,7 @@ Similarly to `IOs`, users should avoid handling the `Fibers` effect directly and ```scala // An example computation with fibers val a: Int > Fibers = - Fibers.fork(Math.cos(42).toInt).map(_.get) + Fibers.init(Math.cos(42).toInt).map(_.get) // Avoid handling 'Fibers' directly // Note how the code has to handle the @@ -1547,7 +1542,7 @@ def test[T](v: T > Options)(implicit f: Flat[T]) = Options.run(v) ``` -All APIs that trigger effect handling have this restriction, which includes not only methods that handle effects directly but also methods that use effect handling internally. For example, `Fibers.fork` handles effects internally and doesn't allow nested effects. +All APIs that trigger effect handling have this restriction, which includes not only methods that handle effects directly but also methods that use effect handling internally. For example, `Fibers.init` handles effects internally and doesn't allow nested effects. ```scala // An example nested computation @@ -1555,10 +1550,10 @@ val a: Int > IOs > IOs = IOs(IOs(1)) // Fails to compile: -// Fibers.fork(a) +// Fibers.init(a) ``` -The compile-time checking mechanism can also be triggered in scenarios where Scala's type inference artificially introduces nesting due to a mismatch between the effects suported by a method and the provided input. In this scenario, the error message contains an additional observation regarding this possibility. For example, `Fibers.fork` only accepts computations with `Fibers` pending. +The compile-time checking mechanism can also be triggered in scenarios where Scala's type inference artificially introduces nesting due to a mismatch between the effects suported by a method and the provided input. In this scenario, the error message contains an additional observation regarding this possibility. For example, `Fibers.init` only accepts computations with `Fibers` pending. ```scala // Example computation with a @@ -1566,7 +1561,7 @@ The compile-time checking mechanism can also be triggered in scenarios where Sca val a: Int > Options = Options.get(Some(1)) -// Fibers.fork(a) +// Fibers.init(a) // Compilation failure: // Method doesn't accept nested Kyo computations. // Detected: 'scala.Int > kyo.options.Options > kyo.concurrent.fibers.Fibers'. Consider using 'flatten' to resolve. diff --git a/kyo-bench/src/main/scala/kyo/bench/Bench.scala b/kyo-bench/src/main/scala/kyo/bench/Bench.scala index 24ee84ced..3a73440be 100644 --- a/kyo-bench/src/main/scala/kyo/bench/Bench.scala +++ b/kyo-bench/src/main/scala/kyo/bench/Bench.scala @@ -43,7 +43,7 @@ object Bench { abstract class Fork[T](implicit f: Flat[T]) extends Bench[T] { @Benchmark - def forkKyo(): T = IOs.run(Fibers.fork(kyoBenchFiber()).flatMap(_.block)) + def forkKyo(): T = IOs.run(Fibers.init(kyoBenchFiber()).flatMap(_.block)) @Benchmark def forkCats(): T = IO.cede.flatMap(_ => catsBench()).unsafeRunSync() diff --git a/kyo-bench/src/main/scala/kyo/bench/CountdownLatchBench.scala b/kyo-bench/src/main/scala/kyo/bench/CountdownLatchBench.scala index abc4be134..13fd8e8de 100644 --- a/kyo-bench/src/main/scala/kyo/bench/CountdownLatchBench.scala +++ b/kyo-bench/src/main/scala/kyo/bench/CountdownLatchBench.scala @@ -31,7 +31,7 @@ class CountdownLatchBench extends Bench.ForkOnly[Int] { for { l <- Latches.init(depth) - _ <- Fibers.fork(iterate(l, depth)) + _ <- Fibers.init(iterate(l, depth)) _ <- l.await } yield 0 } diff --git a/kyo-bench/src/main/scala/kyo/bench/ForkChainedBench.scala b/kyo-bench/src/main/scala/kyo/bench/ForkChainedBench.scala index d227e5583..04d308439 100644 --- a/kyo-bench/src/main/scala/kyo/bench/ForkChainedBench.scala +++ b/kyo-bench/src/main/scala/kyo/bench/ForkChainedBench.scala @@ -27,11 +27,11 @@ class ForkChainedBench extends Bench.ForkOnly[Int] { def iterate(p: Promise[Unit], n: Int): Unit > IOs = if (n <= 0) p.complete(()).unit - else IOs.unit.flatMap(_ => Fibers.fork(iterate(p, n - 1)).unit) + else IOs.unit.flatMap(_ => Fibers.init(iterate(p, n - 1)).unit) for { p <- Fibers.initPromise[Unit] - _ <- Fibers.fork(iterate(p, depth)) + _ <- Fibers.init(iterate(p, depth)) _ <- p.get } yield 0 } diff --git a/kyo-bench/src/main/scala/kyo/bench/ForkManyBench.scala b/kyo-bench/src/main/scala/kyo/bench/ForkManyBench.scala index 81c025aaa..11ac7ac31 100644 --- a/kyo-bench/src/main/scala/kyo/bench/ForkManyBench.scala +++ b/kyo-bench/src/main/scala/kyo/bench/ForkManyBench.scala @@ -43,7 +43,7 @@ class ForkManyBench extends Bench.ForkOnly[Int] { case _ => false } - _ <- repeat(depth)(Fibers.fork(effect)) + _ <- repeat(depth)(Fibers.init(effect)) _ <- promise.get } yield 0 } diff --git a/kyo-bench/src/main/scala/kyo/bench/ForkSpawnBench.scala b/kyo-bench/src/main/scala/kyo/bench/ForkSpawnBench.scala index e8095495b..62a75f164 100644 --- a/kyo-bench/src/main/scala/kyo/bench/ForkSpawnBench.scala +++ b/kyo-bench/src/main/scala/kyo/bench/ForkSpawnBench.scala @@ -46,7 +46,7 @@ class ForkSpawnBench extends Bench.ForkOnly[Unit] { if (level == depth) { cdl.release } else { - repeat(width)(Fibers.fork(loop(cdl, level + 1)).map(_ => ())) + repeat(width)(Fibers.init(loop(cdl, level + 1)).map(_ => ())) } for { diff --git a/kyo-bench/src/main/scala/kyo/bench/PingPongBench.scala b/kyo-bench/src/main/scala/kyo/bench/PingPongBench.scala index 93fa73dba..6614eb10f 100644 --- a/kyo-bench/src/main/scala/kyo/bench/PingPongBench.scala +++ b/kyo-bench/src/main/scala/kyo/bench/PingPongBench.scala @@ -51,17 +51,17 @@ class PingPongBench extends Bench.ForkOnly[Unit] { chan <- Channels.init[Unit](1) effect = for { - _ <- Fibers.fork(chan.put(())) + _ <- Fibers.init(chan.put(())) _ <- chan.take n <- ref.decrementAndGet _ <- if (n == 0) promise.complete(()).unit else IOs.unit } yield () - _ <- repeat(depth)(Fibers.fork(effect)) + _ <- repeat(depth)(Fibers.init(effect)) } yield () for { promise <- Fibers.initPromise[Unit] - _ <- Fibers.fork(iterate(promise, depth)) + _ <- Fibers.init(iterate(promise, depth)) _ <- promise.get } yield () } diff --git a/kyo-bench/src/main/scala/kyo/bench/ProducerConsumerBench.scala b/kyo-bench/src/main/scala/kyo/bench/ProducerConsumerBench.scala index a80c42261..fd569a54e 100644 --- a/kyo-bench/src/main/scala/kyo/bench/ProducerConsumerBench.scala +++ b/kyo-bench/src/main/scala/kyo/bench/ProducerConsumerBench.scala @@ -38,8 +38,8 @@ class ProducerConsumerBench extends Bench.ForkOnly[Unit] { Channels.init[Unit](depth / 2, Access.Spsc).flatMap { q => for { - producer <- Fibers.fork(repeat(depth)(q.put(()))) - consumer <- Fibers.fork(repeat(depth)(q.take)) + producer <- Fibers.init(repeat(depth)(q.put(()))) + consumer <- Fibers.init(repeat(depth)(q.take)) _ <- producer.get _ <- consumer.get } yield {} diff --git a/kyo-bench/src/main/scala/kyo/bench/RendezvousBench.scala b/kyo-bench/src/main/scala/kyo/bench/RendezvousBench.scala index 2b6248b90..5db8565b8 100644 --- a/kyo-bench/src/main/scala/kyo/bench/RendezvousBench.scala +++ b/kyo-bench/src/main/scala/kyo/bench/RendezvousBench.scala @@ -107,8 +107,8 @@ class RendezvousBench extends Bench.ForkOnly[Int] { for { waiting <- Atomics.initRef[Any](null) - _ <- Fibers.fork(produce(waiting)) - consumer <- Fibers.fork(consume(waiting)) + _ <- Fibers.init(produce(waiting)) + consumer <- Fibers.init(consume(waiting)) res <- consumer.get } yield res } diff --git a/kyo-bench/src/main/scala/kyo/bench/SchedulingBench.scala b/kyo-bench/src/main/scala/kyo/bench/SchedulingBench.scala index d1876d777..76d7d8d54 100644 --- a/kyo-bench/src/main/scala/kyo/bench/SchedulingBench.scala +++ b/kyo-bench/src/main/scala/kyo/bench/SchedulingBench.scala @@ -45,7 +45,7 @@ class SchedulingBench extends Bench.ForkOnly[Int] { } Lists.traverse(range) { i => - Fibers.fork(fiber(i)) + Fibers.init(fiber(i)) }.map { fibers => Lists.traverse(fibers)(_.get) }.map(_.sum) diff --git a/kyo-bench/src/main/scala/kyo/bench/SemaphoreContentionBench.scala b/kyo-bench/src/main/scala/kyo/bench/SemaphoreContentionBench.scala index 94e342de6..34eeca4a7 100644 --- a/kyo-bench/src/main/scala/kyo/bench/SemaphoreContentionBench.scala +++ b/kyo-bench/src/main/scala/kyo/bench/SemaphoreContentionBench.scala @@ -52,7 +52,7 @@ class SemaphoreContentionBench extends Bench.ForkOnly[Unit] { for { sem <- Meters.initSemaphore(permits) cdl <- Latches.init(fibers) - _ <- repeat(fibers)(Fibers.fork(loop(sem, cdl))) + _ <- repeat(fibers)(Fibers.init(loop(sem, cdl))) _ <- cdl.await } yield {} } diff --git a/kyo-cache/shared/src/test/scala/kyo/concurrent/cachesTest.scala b/kyo-cache/shared/src/test/scala/kyo/concurrent/cachesTest.scala index 41f72134a..322ff5c1f 100644 --- a/kyo-cache/shared/src/test/scala/kyo/concurrent/cachesTest.scala +++ b/kyo-cache/shared/src/test/scala/kyo/concurrent/cachesTest.scala @@ -28,7 +28,7 @@ class cachesTest extends KyoTest { for { c <- Caches.init(_.maxSize(4)) m = c.memo { (v: Int) => - Fibers.fork[Int] { + Fibers.init[Int] { calls += 1 v + 1 }.map(_.get) @@ -44,7 +44,7 @@ class cachesTest extends KyoTest { for { c <- Caches.init(_.maxSize(4)) m = c.memo { (v: Int) => - Fibers.fork[Int] { + Fibers.init[Int] { calls += 1 if (calls == 1) IOs.fail(ex) diff --git a/kyo-core/jvm/src/test/scala/kyoTest/MonadLawsTest.scala b/kyo-core/jvm/src/test/scala/kyoTest/MonadLawsTest.scala index 2123b3518..bc95fc73b 100644 --- a/kyo-core/jvm/src/test/scala/kyoTest/MonadLawsTest.scala +++ b/kyo-core/jvm/src/test/scala/kyoTest/MonadLawsTest.scala @@ -25,9 +25,9 @@ object MonadLawsTest extends ZIOSpecDefault { Gen.oneOf( gen.map(v => (v: A > Fibers)), gen.map(v => IOs(v)), - gen.map(v => Fibers.fork(v).map(_.get)), - gen.map(v => IOs(Fibers.fork(v).map(_.get))), - gen.map(v => Fibers.fork(IOs(v)).map(_.get)) + gen.map(v => Fibers.init(v).map(_.get)), + gen.map(v => IOs(Fibers.init(v).map(_.get))), + gen.map(v => Fibers.init(IOs(v)).map(_.get)) ).map(Myo(_)) } diff --git a/kyo-core/shared/src/main/scala/kyo/App.scala b/kyo-core/shared/src/main/scala/kyo/App.scala index 0975d6609..526b50d15 100644 --- a/kyo-core/shared/src/main/scala/kyo/App.scala +++ b/kyo-core/shared/src/main/scala/kyo/App.scala @@ -55,7 +55,7 @@ object App { def v3: Try[T] > Fibers = Tries.run(v2) def v4: Try[T] > Fibers = Fibers.timeout(timeout)(v3) def v5: Try[T] > Fibers = Tries.run(v4).map(_.flatten) - def v6: Try[T] > Fibers = Fibers.fork(v5).map(_.get) + def v6: Try[T] > Fibers = Fibers.init(v5).map(_.get) def v7: Fiber[Try[T]] > IOs = Fibers.run(v6) IOs.run(v7) } diff --git a/kyo-core/shared/src/main/scala/kyo/concurrent/fibers.scala b/kyo-core/shared/src/main/scala/kyo/concurrent/fibers.scala index 2abaf826b..e77b423d7 100644 --- a/kyo-core/shared/src/main/scala/kyo/concurrent/fibers.scala +++ b/kyo-core/shared/src/main/scala/kyo/concurrent/fibers.scala @@ -59,9 +59,9 @@ object fibers { def get: T > Fibers = state match { case promise: IOPromise[_] => - Fibers.join(state) + FiberGets(state) case failed: Failed => - Fibers.join(state) + FiberGets(state) case _ => state.asInstanceOf[T > Fibers] } @@ -85,7 +85,7 @@ object fibers { promise.onComplete { t => p.complete(Try(IOs.run(t))) } - Fibers.join(Fiber.promise(p)) + Fibers.get(Fiber.promise(p)) } case Failed(ex) => Failure(ex) @@ -157,7 +157,7 @@ object fibers { try t(state.asInstanceOf[T]) catch { case ex if (NonFatal(ex)) => - Fibers.fail(ex) + Fiber.failed(ex) } } } @@ -188,12 +188,6 @@ object fibers { def get[T, S](v: Fiber[T] > S): T > (Fibers with S) = v.map(_.get) - private[fibers] def join[T, S](v: Fiber[T] > S): T > (Fibers with S) = - FiberGets(v) - - def fail[T](ex: Throwable): Fiber[T] = - Fiber.failed(ex) - private val _promise = IOs(unsafeInitPromise[Object]) def initPromise[T]: Promise[T] > IOs = @@ -206,7 +200,7 @@ object fibers { private val IOTask = kyo.concurrent.scheduler.IOTask /*inline*/ - def fork[T]( /*inline*/ v: => T > Fibers)(implicit f: Flat[T > Fibers]): Fiber[T] > IOs = + def init[T]( /*inline*/ v: => T > Fibers)(implicit f: Flat[T > Fibers]): Fiber[T] > IOs = Locals.save.map(st => Fiber.promise(IOTask(IOs(v), st))) def parallel[T](l: Seq[T > Fibers])(implicit f: Flat[T > Fibers]): Seq[T] > Fibers = @@ -214,7 +208,7 @@ object fibers { case 0 => Seq.empty case 1 => l(0).map(Seq(_)) case _ => - Fibers.join(parallelFiber[T](l)) + Fibers.get(parallelFiber[T](l)) } def parallelFiber[T](l: Seq[T > Fibers])(implicit f: Flat[T > Fibers]): Fiber[Seq[T]] > IOs = @@ -256,7 +250,7 @@ object fibers { case 0 => IOs.fail("Can't race an empty list.") case 1 => l(0) case _ => - Fibers.join(raceFiber[T](l)) + Fibers.get(raceFiber[T](l)) } def raceFiber[T](l: Seq[T > Fibers])(implicit f: Flat[T > Fibers]): Fiber[T] > IOs = @@ -300,7 +294,7 @@ object fibers { } def timeout[T](d: Duration)(v: => T > Fibers)(implicit f: Flat[T > Fibers]): T > Fibers = - fork(v).map { f => + init(v).map { f => val timeout: Unit > IOs = IOs { IOTask(IOs(f.interrupt), Locals.State.empty) @@ -311,10 +305,10 @@ object fibers { } } - def join[T, S](f: Future[T]): T > Fibers = - Fibers.join(joinFiber(f)) + def fromFuture[T, S](f: Future[T]): T > Fibers = + Fibers.get(fromFutureFiber(f)) - def joinFiber[T](f: Future[T]): Fiber[T] > IOs = { + def fromFutureFiber[T](f: Future[T]): Fiber[T] > IOs = { Locals.save.map { st => IOs { val p = new IOPromise[T]() diff --git a/kyo-core/shared/src/main/scala/kyo/concurrent/hubs.scala b/kyo-core/shared/src/main/scala/kyo/concurrent/hubs.scala index ed2f25a6d..da93320db 100644 --- a/kyo-core/shared/src/main/scala/kyo/concurrent/hubs.scala +++ b/kyo-core/shared/src/main/scala/kyo/concurrent/hubs.scala @@ -18,7 +18,7 @@ object hubs { Channels.init[T](capacity).map { ch => IOs { val listeners = new CopyOnWriteArraySet[Channel[T]] - Fibers.fork { + Fibers.init { def loop(): Unit > Fibers = ch.take.map { v => IOs { diff --git a/kyo-core/shared/src/test/scala/kyoTest/KyoAppTest.scala b/kyo-core/shared/src/test/scala/kyoTest/KyoAppTest.scala index 2beb350eb..a1a9fab28 100644 --- a/kyo-core/shared/src/test/scala/kyoTest/KyoAppTest.scala +++ b/kyo-core/shared/src/test/scala/kyoTest/KyoAppTest.scala @@ -31,7 +31,7 @@ class AppTest extends KyoTest { _ <- Consoles.println("1") _ <- Clocks.now _ <- Resources.ensure(()) - _ <- Fibers.fork(()) + _ <- Fibers.init(()) } yield () } app.main(Array()) diff --git a/kyo-core/shared/src/test/scala/kyoTest/concurrent/channelsTest.scala b/kyo-core/shared/src/test/scala/kyoTest/concurrent/channelsTest.scala index 7ed0ab887..2f84df43e 100644 --- a/kyo-core/shared/src/test/scala/kyoTest/concurrent/channelsTest.scala +++ b/kyo-core/shared/src/test/scala/kyoTest/concurrent/channelsTest.scala @@ -52,8 +52,8 @@ class channelsTest extends KyoTest { b <- c.offer(1) put <- c.putFiber(2) f <- c.isFull - take1 <- Fibers.fork(c.takeFiber) - take2 <- Fibers.fork(c.takeFiber) + take1 <- Fibers.init(c.takeFiber) + take2 <- Fibers.init(c.takeFiber) v1 <- take1.get _ <- put.get v2 <- take1.get diff --git a/kyo-core/shared/src/test/scala/kyoTest/concurrent/fibersTest.scala b/kyo-core/shared/src/test/scala/kyoTest/concurrent/fibersTest.scala index f33670f67..b6f8e91af 100644 --- a/kyo-core/shared/src/test/scala/kyoTest/concurrent/fibersTest.scala +++ b/kyo-core/shared/src/test/scala/kyoTest/concurrent/fibersTest.scala @@ -58,18 +58,18 @@ class fibersTest extends KyoTest { "fork" - { "value" in run { for { - v <- Fibers.fork(1).map(_.get) + v <- Fibers.init(1).map(_.get) } yield assert(v == 1) } "executes in a different thread" in runJVM { val t1 = Thread.currentThread() for { - t2 <- Fibers.fork(Thread.currentThread()).map(_.get) + t2 <- Fibers.init(Thread.currentThread()).map(_.get) } yield assert(t1 != t2) } "multiple" in run { for { - v0 <- Fibers.fork(0).map(_.get) + v0 <- Fibers.init(0).map(_.get) (v1, v2) <- Fibers.parallel(1, 2) (v3, v4, v5) <- Fibers.parallel(3, 4, 5) (v6, v7, v8, v9) <- Fibers.parallel(6, 7, 8, 9) @@ -78,7 +78,7 @@ class fibersTest extends KyoTest { "nested" in runJVM { val t1 = Thread.currentThread() for { - t2 <- Fibers.fork(IOs(Fibers.fork(Thread.currentThread()).map(_.get))).map(_.get) + t2 <- Fibers.init(IOs(Fibers.init(Thread.currentThread()).map(_.get))).map(_.get) } yield assert(t1 != t2) } } @@ -116,7 +116,7 @@ class fibersTest extends KyoTest { for { started <- Latches.init(1) done <- Latches.init(1) - fiber <- Fibers.fork(runLoop(started, done)) + fiber <- Fibers.init(runLoop(started, done)) _ <- started.await interrupted <- fiber.interrupt _ <- done.await @@ -126,9 +126,9 @@ class fibersTest extends KyoTest { for { started <- Latches.init(3) done <- Latches.init(3) - fiber1 <- Fibers.fork(runLoop(started, done)) - fiber2 <- Fibers.fork(runLoop(started, done)) - fiber3 <- Fibers.fork(runLoop(started, done)) + fiber1 <- Fibers.init(runLoop(started, done)) + fiber2 <- Fibers.init(runLoop(started, done)) + fiber3 <- Fibers.init(runLoop(started, done)) _ <- started.await interrupted1 <- fiber1.interrupt interrupted2 <- fiber2.interrupt @@ -275,7 +275,7 @@ class fibersTest extends KyoTest { "deep handler" - { "transform" in run { for { - v1 <- Fibers.fork(1).map(_.get) + v1 <- Fibers.init(1).map(_.get) (v2, v3) <- Fibers.parallel(2, 3) l <- Fibers.parallel(List[Int > Any](4, 5)) } yield assert(v1 + v2 + v3 + l.sum == 15) @@ -293,7 +293,7 @@ class fibersTest extends KyoTest { for { l <- Latches.init(1) - fiber <- Fibers.run(IOs.runLazy(Fibers.fork(task(l)))) + fiber <- Fibers.run(IOs.runLazy(Fibers.init(task(l)))) _ <- Fibers.sleep(10.millis) interrupted <- fiber.interrupt _ <- l.await @@ -313,7 +313,7 @@ class fibersTest extends KyoTest { for { r <- Resources.acquire(resource1) v1 <- IOs(r.incrementAndGet()) - v2 <- Fibers.fork(r.incrementAndGet()).map(_.get) + v2 <- Fibers.init(r.incrementAndGet()).map(_.get) } yield (r, Set(v1, v2)) Resources.run[(JAtomicInteger with Closeable, Set[Int]), Fibers](io1).map { case (r, v) => @@ -324,7 +324,7 @@ class fibersTest extends KyoTest { "inner" in run { val resource1 = new Resource val resource2 = new Resource - Fibers.fork(Resources.run(Resources.acquire(resource1).map(_.incrementAndGet()))) + Fibers.init(Resources.run(Resources.acquire(resource1).map(_.incrementAndGet()))) .map(_.get).map { r => assert(r == 1) assert(resource1.get() == -1) @@ -349,7 +349,7 @@ class fibersTest extends KyoTest { for { r <- Resources.acquire(resource1) v1 <- IOs(r.incrementAndGet()) - v2 <- Fibers.fork(r.incrementAndGet()).map(_.get) + v2 <- Fibers.init(r.incrementAndGet()).map(_.get) v3 <- Resources.run(Resources.acquire(resource2).map(_.incrementAndGet())) } yield Set(v1, v2, v3) Resources.run[Set[Int], Fibers](io1).map { r => @@ -364,10 +364,10 @@ class fibersTest extends KyoTest { val l = Locals.init(10) "fork" - { "default" in run { - Fibers.fork(l.get).map(_.get).map(v => assert(v == 10)) + Fibers.init(l.get).map(_.get).map(v => assert(v == 10)) } "let" in run { - l.let(20)(Fibers.fork(l.get).map(_.get)).map(v => assert(v == 20)) + l.let(20)(Fibers.init(l.get).map(_.get)).map(v => assert(v == 20)) } } "race" - { @@ -391,7 +391,7 @@ class fibersTest extends KyoTest { "stack safety" in run { def loop(i: Int): Assertion > Fibers = if (i > 0) { - Fibers.fork(List.fill(1000)(())).map(_ => loop(i - 1)) + Fibers.init(List.fill(1000)(())).map(_ => loop(i - 1)) } else { succeed } diff --git a/kyo-core/shared/src/test/scala/kyoTest/concurrent/hubsTest.scala b/kyo-core/shared/src/test/scala/kyoTest/concurrent/hubsTest.scala index 455f58aa5..84c50e652 100644 --- a/kyo-core/shared/src/test/scala/kyoTest/concurrent/hubsTest.scala +++ b/kyo-core/shared/src/test/scala/kyoTest/concurrent/hubsTest.scala @@ -148,7 +148,7 @@ class hubsTest extends KyoTest { for { h <- Hubs.init[Int](2) l <- h.listen - _ <- Lists.fill(100)(Fibers.fork(h.put(1))) + _ <- Lists.fill(100)(Fibers.init(h.put(1))) t <- Lists.fill(100)(l.take) e1 <- h.isEmpty e2 <- l.isEmpty @@ -158,8 +158,8 @@ class hubsTest extends KyoTest { for { h <- Hubs.init[Int](2) l <- h.listen - _ <- Lists.fill(100)(Fibers.fork(h.put(1))) - t <- Lists.fill(100)(Fibers.fork(l.take).map(_.get)) + _ <- Lists.fill(100)(Fibers.init(h.put(1))) + t <- Lists.fill(100)(Fibers.init(l.take).map(_.get)) e1 <- h.isEmpty e2 <- l.isEmpty } yield assert(t == List.fill(100)(1) && e1 && e2) @@ -167,9 +167,9 @@ class hubsTest extends KyoTest { "listeners" in runJVM { for { h <- Hubs.init[Int](2) - l <- Lists.fill(100)(Fibers.fork(h.listen).map(_.get)) - _ <- Fibers.fork(h.put(1)) - t <- Lists.traverse(l)(l => Fibers.fork(l.take).map(_.get)) + l <- Lists.fill(100)(Fibers.init(h.listen).map(_.get)) + _ <- Fibers.init(h.put(1)) + t <- Lists.traverse(l)(l => Fibers.init(l.take).map(_.get)) e1 <- h.isEmpty e2 <- Lists.traverse(l)(_.isEmpty) } yield assert(t == List.fill(100)(1) && e1 && e2 == Lists.fill(100)(true)) diff --git a/kyo-core/shared/src/test/scala/kyoTest/concurrent/latchesTest.scala b/kyo-core/shared/src/test/scala/kyoTest/concurrent/latchesTest.scala index b820dd0bc..abb4be748 100644 --- a/kyo-core/shared/src/test/scala/kyoTest/concurrent/latchesTest.scala +++ b/kyo-core/shared/src/test/scala/kyoTest/concurrent/latchesTest.scala @@ -36,7 +36,7 @@ class latchesTest extends KyoTest { "countDown + fibers + await" in runJVM { for { latch <- Latches.init(1) - _ <- Fibers.fork(latch.release) + _ <- Fibers.init(latch.release) _ <- latch.await } yield succeed } diff --git a/kyo-core/shared/src/test/scala/kyoTest/concurrent/metersTest.scala b/kyo-core/shared/src/test/scala/kyoTest/concurrent/metersTest.scala index 9fb267899..7572c21d4 100644 --- a/kyo-core/shared/src/test/scala/kyoTest/concurrent/metersTest.scala +++ b/kyo-core/shared/src/test/scala/kyoTest/concurrent/metersTest.scala @@ -25,11 +25,11 @@ class metersTest extends KyoTest { t <- Meters.initMutex p <- Fibers.initPromise[Int] b1 <- Fibers.initPromise[Unit] - f1 <- Fibers.fork(t.run(b1.complete(()).map(_ => p.block))) + f1 <- Fibers.init(t.run(b1.complete(()).map(_ => p.block))) _ <- b1.get a1 <- t.isAvailable b2 <- Fibers.initPromise[Unit] - f2 <- Fibers.fork(b2.complete(()).map(_ => t.run(2))) + f2 <- Fibers.init(b2.complete(()).map(_ => t.run(2))) _ <- b2.get a2 <- t.isAvailable d1 <- f1.isDone @@ -46,7 +46,7 @@ class metersTest extends KyoTest { sem <- Meters.initSemaphore(1) p <- Fibers.initPromise[Int] b1 <- Fibers.initPromise[Unit] - f1 <- Fibers.fork(sem.tryRun(b1.complete(()).map(_ => p.block))) + f1 <- Fibers.init(sem.tryRun(b1.complete(()).map(_ => p.block))) _ <- b1.get a1 <- sem.isAvailable b1 <- sem.tryRun(2) @@ -71,14 +71,14 @@ class metersTest extends KyoTest { t <- Meters.initSemaphore(2) p <- Fibers.initPromise[Int] b1 <- Fibers.initPromise[Unit] - f1 <- Fibers.fork(t.run(b1.complete(()).map(_ => p.block))) + f1 <- Fibers.init(t.run(b1.complete(()).map(_ => p.block))) _ <- b1.get b2 <- Fibers.initPromise[Unit] - f2 <- Fibers.fork(t.run(b2.complete(()).map(_ => p.block))) + f2 <- Fibers.init(t.run(b2.complete(()).map(_ => p.block))) _ <- b2.get a1 <- t.isAvailable b3 <- Fibers.initPromise[Unit] - f2 <- Fibers.fork(b3.complete(()).map(_ => t.run(2))) + f2 <- Fibers.init(b3.complete(()).map(_ => t.run(2))) _ <- b3.get a2 <- t.isAvailable d1 <- f1.isDone @@ -95,10 +95,10 @@ class metersTest extends KyoTest { sem <- Meters.initSemaphore(2) p <- Fibers.initPromise[Int] b1 <- Fibers.initPromise[Unit] - f1 <- Fibers.fork(sem.tryRun(b1.complete(()).map(_ => p.block))) + f1 <- Fibers.init(sem.tryRun(b1.complete(()).map(_ => p.block))) _ <- b1.get b2 <- Fibers.initPromise[Unit] - f2 <- Fibers.fork(sem.tryRun(b2.complete(()).map(_ => p.block))) + f2 <- Fibers.init(sem.tryRun(b2.complete(()).map(_ => p.block))) _ <- b2.get a1 <- sem.isAvailable b3 <- sem.tryRun(2) @@ -126,7 +126,7 @@ class metersTest extends KyoTest { for { meter <- Meters.initRateLimiter(10, 10.millis) counter <- Atomics.initInt(0) - f1 <- Fibers.fork(loop(meter, counter)) + f1 <- Fibers.init(loop(meter, counter)) _ <- Fibers.sleep(50.millis) _ <- f1.interrupt v1 <- counter.get @@ -136,8 +136,8 @@ class metersTest extends KyoTest { for { meter <- Meters.initRateLimiter(10, 10.millis) counter <- Atomics.initInt(0) - f1 <- Fibers.fork(loop(meter, counter)) - f2 <- Fibers.fork(loop(meter, counter)) + f1 <- Fibers.init(loop(meter, counter)) + f2 <- Fibers.init(loop(meter, counter)) _ <- Fibers.sleep(50.millis) _ <- f1.interrupt _ <- f2.interrupt @@ -152,8 +152,8 @@ class metersTest extends KyoTest { for { meter <- Meters.pipeline(Meters.initRateLimiter(2, 1.millis), Meters.initMutex) counter <- Atomics.initInt(0) - f1 <- Fibers.fork(loop(meter, counter)) - f2 <- Fibers.fork(loop(meter, counter)) + f1 <- Fibers.init(loop(meter, counter)) + f2 <- Fibers.init(loop(meter, counter)) _ <- Fibers.sleep(50.millis) _ <- f1.interrupt _ <- f2.interrupt @@ -165,7 +165,7 @@ class metersTest extends KyoTest { // for { // meter <- Meters.pipeline(Meters.rateLimiter(2, 1.millis), Meters.mutex) // counter <- Atomics.initInt(0) - // f1 <- Fibers.fork(loop(meter, counter)) + // f1 <- Fibers.init(loop(meter, counter)) // _ <- Fibers.sleep(50.millis) // _ <- retry(meter.isAvailable(!_)) // _ <- Fibers.sleep(50.millis) diff --git a/kyo-sttp/js/src/main/scala/kyo/PlatformBackend.scala b/kyo-sttp/js/src/main/scala/kyo/PlatformBackend.scala index 58667225a..56393d38b 100644 --- a/kyo-sttp/js/src/main/scala/kyo/PlatformBackend.scala +++ b/kyo-sttp/js/src/main/scala/kyo/PlatformBackend.scala @@ -9,6 +9,6 @@ private[kyo] object PlatformBackend { new Backend { val b = FetchBackend() def send[T](r: Request[T, Any]) = - Fibers.join(r.send(b)) + Fibers.fromFuture(r.send(b)) } } diff --git a/kyo-tapir/src/main/scala/kyo/server/NettyKyoServer.scala b/kyo-tapir/src/main/scala/kyo/server/NettyKyoServer.scala index ac3285791..14c7af979 100644 --- a/kyo-tapir/src/main/scala/kyo/server/NettyKyoServer.scala +++ b/kyo-tapir/src/main/scala/kyo/server/NettyKyoServer.scala @@ -88,7 +88,7 @@ case class NettyKyoServer( ) = f => { val fiber: Fiber[ServerResponse[NettyResponse]] = - IOs.run(Fibers.run(IOs.runLazy(Fibers.fork[ServerResponse[NettyResponse]](f()))) + IOs.run(Fibers.run(IOs.runLazy(Fibers.init[ServerResponse[NettyResponse]](f()))) .map(_.transform(identity(_)))) ( IOs.run(fiber.toFuture), @@ -132,7 +132,7 @@ object NettyKyoServer { private[kyo] val runAsync = new RunAsync[KyoSttpMonad.M] { override def apply[T](f: => T > Fibers): Unit = IOs.run { - Fibers.fork { + Fibers.init { import Flat.unsafe.unchecked IOs.run(Fibers.run(IOs.runLazy(f)).unit) }