diff --git a/README.md b/README.md index d52cfe8a1..39231af83 100644 --- a/README.md +++ b/README.md @@ -1164,7 +1164,7 @@ val a: Queue[Int] > IOs = ) ``` -### Channels: Asynchronous Communication +### Channels: Backpressured Communication The `Channels` effect serves as an advanced concurrency primitive, designed to facilitate seamless and backpressured data transfer between various parts of your application. Built upon the `Fibers` effect, `Channels` not only ensures thread-safe communication but also incorporates a backpressure mechanism. This mechanism temporarily suspends fibers under specific conditions—either when waiting for new items to arrive or when awaiting space to add new items. diff --git a/kyo-bench/src/main/scala/kyo/bench/ChainedForkBench.scala b/kyo-bench/src/main/scala/kyo/bench/ChainedForkBench.scala index 503cfac8e..018b597c7 100644 --- a/kyo-bench/src/main/scala/kyo/bench/ChainedForkBench.scala +++ b/kyo-bench/src/main/scala/kyo/bench/ChainedForkBench.scala @@ -55,7 +55,7 @@ class ChainedForkBench extends Bench.ForkOnly[Int] { else IOs.unit.flatMap(_ => Fibers.forkFiber(iterate(p, n - 1)).unit) for { - p <- Fibers.promise[Unit] + p <- Fibers.initPromise[Unit] _ <- Fibers.forkFiber(iterate(p, depth)) _ <- p.get } yield 0 diff --git a/kyo-bench/src/main/scala/kyo/bench/ForkManyBench.scala b/kyo-bench/src/main/scala/kyo/bench/ForkManyBench.scala index 1b4509e14..cd1099b20 100644 --- a/kyo-bench/src/main/scala/kyo/bench/ForkManyBench.scala +++ b/kyo-bench/src/main/scala/kyo/bench/ForkManyBench.scala @@ -47,7 +47,7 @@ class ForkManyBench extends Bench.ForkOnly[Int] { else io.flatMap(_ => repeat(n - 1)(io)) for { - promise <- Fibers.promise[Unit] + promise <- Fibers.initPromise[Unit] ref <- Atomics.initInt(depth) effect = ref.decrementAndGet.flatMap { case 1 => diff --git a/kyo-bench/src/main/scala/kyo/bench/PingPongBench.scala b/kyo-bench/src/main/scala/kyo/bench/PingPongBench.scala index e220af310..9d29ccf43 100644 --- a/kyo-bench/src/main/scala/kyo/bench/PingPongBench.scala +++ b/kyo-bench/src/main/scala/kyo/bench/PingPongBench.scala @@ -71,7 +71,7 @@ class PingPongBench extends Bench.ForkOnly[Unit] { } yield () for { - promise <- Fibers.promise[Unit] + promise <- Fibers.initPromise[Unit] _ <- Fibers.forkFiber(iterate(promise, depth)) _ <- promise.get } yield () diff --git a/kyo-bench/src/main/scala/kyo/bench/SemaphoreBench.scala b/kyo-bench/src/main/scala/kyo/bench/SemaphoreBench.scala index b4f7ebf99..7fd25fc33 100644 --- a/kyo-bench/src/main/scala/kyo/bench/SemaphoreBench.scala +++ b/kyo-bench/src/main/scala/kyo/bench/SemaphoreBench.scala @@ -40,7 +40,7 @@ class SemaphoreBench extends Bench.ForkOnly[Unit] { else s.run(()).flatMap(_ => loop(s, i + 1)) - Meters.semaphore(1).flatMap(loop(_, 0)) + Meters.initSemaphore(1).flatMap(loop(_, 0)) } def zioBench(): UIO[Unit] = { diff --git a/kyo-chatgpt/jvm/src/main/scala/kyo/chatgpt/UI.scala b/kyo-chatgpt/jvm/src/main/scala/kyo/chatgpt/UI.scala index ab5874244..f4dfb5555 100644 --- a/kyo-chatgpt/jvm/src/main/scala/kyo/chatgpt/UI.scala +++ b/kyo-chatgpt/jvm/src/main/scala/kyo/chatgpt/UI.scala @@ -183,7 +183,7 @@ object UI extends scala.App { IOs.run { App.runFiber(Duration.Inf) { for { - p <- Fibers.promise[String] + p <- Fibers.initPromise[String] _ <- chan.put((message, enabledModes.toList, p)) r <- p.get } yield r diff --git a/kyo-core/shared/src/main/scala/kyo/concurrent/channels.scala b/kyo-core/shared/src/main/scala/kyo/concurrent/channels.scala index 025b79039..53be570ed 100644 --- a/kyo-core/shared/src/main/scala/kyo/concurrent/channels.scala +++ b/kyo-core/shared/src/main/scala/kyo/concurrent/channels.scala @@ -38,7 +38,7 @@ object channels { object Channels { def init[T](capacity: Int, access: Access = Access.Mpmc): Channel[T] > IOs = - Queues.bounded[T](capacity, access).map { queue => + Queues.initBounded[T](capacity, access).map { queue => IOs { new Channel[T] { @@ -71,7 +71,7 @@ object channels { if (u.offer(v)) { Fibers.value(()) } else { - val p = Fibers.unsafePromise[Unit] + val p = Fibers.unsafeInitPromise[Unit] puts.add((v, p)) p } @@ -88,7 +88,7 @@ object channels { case Some(v) => Fibers.value(v) case None => - val p = Fibers.unsafePromise[T] + val p = Fibers.unsafeInitPromise[T] takes.add(p) p } @@ -108,7 +108,7 @@ object channels { takes.add(p) case Some(v) => if (!p.unsafeComplete(v) && !u.offer(v)) { - val p = Fibers.unsafePromise[Unit] + val p = Fibers.unsafeInitPromise[Unit] puts.add((v, p)) } } diff --git a/kyo-core/shared/src/main/scala/kyo/concurrent/fibers.scala b/kyo-core/shared/src/main/scala/kyo/concurrent/fibers.scala index f751082b0..2ac369801 100644 --- a/kyo-core/shared/src/main/scala/kyo/concurrent/fibers.scala +++ b/kyo-core/shared/src/main/scala/kyo/concurrent/fibers.scala @@ -243,12 +243,12 @@ object fibers { def fail[T](ex: Throwable): Fiber[T] = Fiber.failed(ex) - private val _promise = IOs(unsafePromise[Object]) + private val _promise = IOs(unsafeInitPromise[Object]) - def promise[T]: Promise[T] > IOs = + def initPromise[T]: Promise[T] > IOs = _promise.asInstanceOf[Promise[T] > IOs] - private[kyo] def unsafePromise[T]: Promise[T] = + private[kyo] def unsafeInitPromise[T]: Promise[T] = Fiber.promise(new IOPromise[T]()) // compiler bug workaround @@ -459,7 +459,7 @@ object fibers { IOs(Fiber.promise(new IOPromise[Unit])) def sleep(d: Duration): Unit > (IOs with Fibers with Timers) = - promise[Unit].map { p => + initPromise[Unit].map { p => if (d.isFinite) { val run: Unit > IOs = IOs { diff --git a/kyo-core/shared/src/main/scala/kyo/concurrent/latches.scala b/kyo-core/shared/src/main/scala/kyo/concurrent/latches.scala index bd094f3cf..e311805a5 100644 --- a/kyo-core/shared/src/main/scala/kyo/concurrent/latches.scala +++ b/kyo-core/shared/src/main/scala/kyo/concurrent/latches.scala @@ -34,7 +34,7 @@ object latches { } else { IOs { new Latch { - val promise = Fibers.unsafePromise[Unit] + val promise = Fibers.unsafeInitPromise[Unit] val count = new AtomicInteger(n) val await = promise.get diff --git a/kyo-core/shared/src/main/scala/kyo/concurrent/meters.scala b/kyo-core/shared/src/main/scala/kyo/concurrent/meters.scala index e21fcaf39..ca2e52f74 100644 --- a/kyo-core/shared/src/main/scala/kyo/concurrent/meters.scala +++ b/kyo-core/shared/src/main/scala/kyo/concurrent/meters.scala @@ -27,10 +27,10 @@ object meters { object Meters { - def mutex: Meter > IOs = - semaphore(1) + val initMutex: Meter > IOs = + initSemaphore(1) - def semaphore(concurrency: Int): Meter > IOs = + def initSemaphore(concurrency: Int): Meter > IOs = Channels.init[Unit](concurrency).map { chan => offer(concurrency, chan, ()).map { _ => new Meter { @@ -57,7 +57,7 @@ object meters { } } - def rateLimiter(rate: Int, period: Duration): Meter > (IOs with Timers) = + def initRateLimiter(rate: Int, period: Duration): Meter > (IOs with Timers) = Channels.init[Unit](rate).map { chan => Timers.scheduleAtFixedRate(period)(offer(rate, chan, ())).map { _ => new Meter { diff --git a/kyo-core/shared/src/main/scala/kyo/concurrent/queues.scala b/kyo-core/shared/src/main/scala/kyo/concurrent/queues.scala index 8a5fc8ba8..f76c5d3ab 100644 --- a/kyo-core/shared/src/main/scala/kyo/concurrent/queues.scala +++ b/kyo-core/shared/src/main/scala/kyo/concurrent/queues.scala @@ -49,7 +49,7 @@ object queues { def peek() = None } - def bounded[T](capacity: Int, access: Access = Access.Mpmc): Queue[T] > IOs = + def initBounded[T](capacity: Int, access: Access = Access.Mpmc): Queue[T] > IOs = IOs { capacity match { case c if (c <= 0) => @@ -80,7 +80,7 @@ object queues { } } - def unbounded[T](access: Access = Access.Mpmc, chunkSize: Int = 8): Unbounded[T] > IOs = + def initUnbounded[T](access: Access = Access.Mpmc, chunkSize: Int = 8): Unbounded[T] > IOs = IOs { access match { case Access.Mpmc => @@ -94,8 +94,8 @@ object queues { } } - def dropping[T](capacity: Int, access: Access = Access.Mpmc): Unbounded[T] > IOs = - bounded[T](capacity, access).map { q => + def initDropping[T](capacity: Int, access: Access = Access.Mpmc): Unbounded[T] > IOs = + initBounded[T](capacity, access).map { q => val u = q.unsafe val c = capacity new Unbounded( @@ -111,8 +111,8 @@ object queues { ) } - def sliding[T](capacity: Int, access: Access = Access.Mpmc): Unbounded[T] > IOs = - bounded[T](capacity, access).map { q => + def initSliding[T](capacity: Int, access: Access = Access.Mpmc): Unbounded[T] > IOs = + initBounded[T](capacity, access).map { q => val u = q.unsafe val c = capacity new Unbounded( diff --git a/kyo-core/shared/src/test/scala/kyoTest/concurrent/fibersTest.scala b/kyo-core/shared/src/test/scala/kyoTest/concurrent/fibersTest.scala index 443be7bf2..34e76ce1a 100644 --- a/kyo-core/shared/src/test/scala/kyoTest/concurrent/fibersTest.scala +++ b/kyo-core/shared/src/test/scala/kyoTest/concurrent/fibersTest.scala @@ -30,7 +30,7 @@ class fibersTest extends KyoTest { "promise" - { "complete" in run { for { - p <- Fibers.promise[Int] + p <- Fibers.initPromise[Int] a <- p.complete(1) b <- p.isDone c <- p.get @@ -38,7 +38,7 @@ class fibersTest extends KyoTest { } "complete twice" in run { for { - p <- Fibers.promise[Int] + p <- Fibers.initPromise[Int] a <- p.complete(1) b <- p.complete(2) c <- p.isDone @@ -48,7 +48,7 @@ class fibersTest extends KyoTest { "failure" in run { val ex = new Exception for { - p <- Fibers.promise[Int] + p <- Fibers.initPromise[Int] a <- p.complete(throw ex) b <- p.isDone c <- p.getTry diff --git a/kyo-core/shared/src/test/scala/kyoTest/concurrent/metersTest.scala b/kyo-core/shared/src/test/scala/kyoTest/concurrent/metersTest.scala index f667cd30c..e5299a7f7 100644 --- a/kyo-core/shared/src/test/scala/kyoTest/concurrent/metersTest.scala +++ b/kyo-core/shared/src/test/scala/kyoTest/concurrent/metersTest.scala @@ -15,20 +15,20 @@ class metersTest extends KyoTest { "mutex" - { "ok" in runJVM { for { - t <- Meters.mutex + t <- Meters.initMutex v <- t.run(2) } yield assert(v == 2) } "run" in runJVM { for { - t <- Meters.mutex - p <- Fibers.promise[Int] - b1 <- Fibers.promise[Unit] + t <- Meters.initMutex + p <- Fibers.initPromise[Int] + b1 <- Fibers.initPromise[Unit] f1 <- Fibers.forkFiber(t.run(b1.complete(()).map(_ => p.block))) _ <- b1.get a1 <- t.isAvailable - b2 <- Fibers.promise[Unit] + b2 <- Fibers.initPromise[Unit] f2 <- Fibers.forkFiber(b2.complete(()).map(_ => t.run(2))) _ <- b2.get a2 <- t.isAvailable @@ -43,9 +43,9 @@ class metersTest extends KyoTest { "tryRun" in runJVM { for { - sem <- Meters.semaphore(1) - p <- Fibers.promise[Int] - b1 <- Fibers.promise[Unit] + sem <- Meters.initSemaphore(1) + p <- Fibers.initPromise[Int] + b1 <- Fibers.initPromise[Unit] f1 <- Fibers.forkFiber(sem.tryRun(b1.complete(()).map(_ => p.block))) _ <- b1.get a1 <- sem.isAvailable @@ -60,7 +60,7 @@ class metersTest extends KyoTest { "semaphore" - { "ok" in runJVM { for { - t <- Meters.semaphore(2) + t <- Meters.initSemaphore(2) v1 <- t.run(2) v2 <- t.run(3) } yield assert(v1 == 2 && v2 == 3) @@ -68,16 +68,16 @@ class metersTest extends KyoTest { "run" in runJVM { for { - t <- Meters.semaphore(2) - p <- Fibers.promise[Int] - b1 <- Fibers.promise[Unit] + t <- Meters.initSemaphore(2) + p <- Fibers.initPromise[Int] + b1 <- Fibers.initPromise[Unit] f1 <- Fibers.forkFiber(t.run(b1.complete(()).map(_ => p.block))) _ <- b1.get - b2 <- Fibers.promise[Unit] + b2 <- Fibers.initPromise[Unit] f2 <- Fibers.forkFiber(t.run(b2.complete(()).map(_ => p.block))) _ <- b2.get a1 <- t.isAvailable - b3 <- Fibers.promise[Unit] + b3 <- Fibers.initPromise[Unit] f2 <- Fibers.forkFiber(b3.complete(()).map(_ => t.run(2))) _ <- b3.get a2 <- t.isAvailable @@ -92,12 +92,12 @@ class metersTest extends KyoTest { "tryRun" in runJVM { for { - sem <- Meters.semaphore(2) - p <- Fibers.promise[Int] - b1 <- Fibers.promise[Unit] + sem <- Meters.initSemaphore(2) + p <- Fibers.initPromise[Int] + b1 <- Fibers.initPromise[Unit] f1 <- Fibers.forkFiber(sem.tryRun(b1.complete(()).map(_ => p.block))) _ <- b1.get - b2 <- Fibers.promise[Unit] + b2 <- Fibers.initPromise[Unit] f2 <- Fibers.forkFiber(sem.tryRun(b2.complete(()).map(_ => p.block))) _ <- b2.get a1 <- sem.isAvailable @@ -117,14 +117,14 @@ class metersTest extends KyoTest { "rate limiter" - { "ok" in runJVM { for { - t <- Meters.rateLimiter(2, 10.millis) + t <- Meters.initRateLimiter(2, 10.millis) v1 <- t.run(2) v2 <- t.run(3) } yield assert(v1 == 2 && v2 == 3) } "one loop" in runJVM { for { - meter <- Meters.rateLimiter(10, 10.millis) + meter <- Meters.initRateLimiter(10, 10.millis) counter <- Atomics.initInt(0) f1 <- Fibers.forkFiber(loop(meter, counter)) _ <- Fibers.sleep(50.millis) @@ -134,7 +134,7 @@ class metersTest extends KyoTest { } "two loops" in runJVM { for { - meter <- Meters.rateLimiter(10, 10.millis) + meter <- Meters.initRateLimiter(10, 10.millis) counter <- Atomics.initInt(0) f1 <- Fibers.forkFiber(loop(meter, counter)) f2 <- Fibers.forkFiber(loop(meter, counter)) @@ -150,7 +150,7 @@ class metersTest extends KyoTest { "run" in runJVM { for { - meter <- Meters.pipeline(Meters.rateLimiter(2, 1.millis), Meters.mutex) + meter <- Meters.pipeline(Meters.initRateLimiter(2, 1.millis), Meters.initMutex) counter <- Atomics.initInt(0) f1 <- Fibers.forkFiber(loop(meter, counter)) f2 <- Fibers.forkFiber(loop(meter, counter)) diff --git a/kyo-core/shared/src/test/scala/kyoTest/concurrent/queuesTest.scala b/kyo-core/shared/src/test/scala/kyoTest/concurrent/queuesTest.scala index 8c5767615..1a4c05eec 100644 --- a/kyo-core/shared/src/test/scala/kyoTest/concurrent/queuesTest.scala +++ b/kyo-core/shared/src/test/scala/kyoTest/concurrent/queuesTest.scala @@ -10,27 +10,27 @@ class queuesTest extends KyoTest { "bounded" - { "isEmpty" in run { for { - q <- Queues.bounded[Int](2) + q <- Queues.initBounded[Int](2) b <- q.isEmpty } yield assert(b) } "offer and poll" in run { for { - q <- Queues.bounded[Int](2) + q <- Queues.initBounded[Int](2) b <- q.offer(1) v <- q.poll } yield assert(b && v == Some(1)) } "peek" in run { for { - q <- Queues.bounded[Int](2) + q <- Queues.initBounded[Int](2) _ <- q.offer(1) v <- q.peek } yield assert(v == Some(1)) } "full" in run { for { - q <- Queues.bounded[Int](2) + q <- Queues.initBounded[Int](2) _ <- q.offer(1) _ <- q.offer(2) b <- q.offer(3) @@ -41,27 +41,27 @@ class queuesTest extends KyoTest { "unbounded" - { "isEmpty" in run { for { - q <- Queues.unbounded[Int]() + q <- Queues.initUnbounded[Int]() b <- q.isEmpty } yield assert(b) } "offer and poll" in run { for { - q <- Queues.unbounded[Int]() + q <- Queues.initUnbounded[Int]() b <- q.offer(1) v <- q.poll } yield assert(b && v == Some(1)) } "peek" in run { for { - q <- Queues.unbounded[Int]() + q <- Queues.initUnbounded[Int]() _ <- q.offer(1) v <- q.peek } yield assert(v == Some(1)) } "add and poll" in run { for { - q <- Queues.unbounded[Int]() + q <- Queues.initUnbounded[Int]() _ <- q.add(1) v <- q.poll } yield assert(v == Some(1)) @@ -70,7 +70,7 @@ class queuesTest extends KyoTest { "dropping" in run { for { - q <- Queues.dropping[Int](2) + q <- Queues.initDropping[Int](2) _ <- q.add(1) _ <- q.add(2) _ <- q.add(3) @@ -82,7 +82,7 @@ class queuesTest extends KyoTest { "sliding" in run { for { - q <- Queues.sliding[Int](2) + q <- Queues.initSliding[Int](2) _ <- q.add(1) _ <- q.add(2) _ <- q.add(3) diff --git a/kyo-core/shared/src/test/scala/kyoTest/concurrent/timersTest.scala b/kyo-core/shared/src/test/scala/kyoTest/concurrent/timersTest.scala index 9b05d6968..2034f1877 100644 --- a/kyo-core/shared/src/test/scala/kyoTest/concurrent/timersTest.scala +++ b/kyo-core/shared/src/test/scala/kyoTest/concurrent/timersTest.scala @@ -14,7 +14,7 @@ class timersTest extends KyoTest { "schedule" in run { for { - p <- Fibers.promise[String] + p <- Fibers.initPromise[String] _ <- Timers.schedule(1.milli)(p.complete("hello").map(require(_))) hello <- p.get } yield assert(hello == "hello") @@ -22,7 +22,7 @@ class timersTest extends KyoTest { "cancel" in runJVM { for { - p <- Fibers.promise[String] + p <- Fibers.initPromise[String] task <- Timers.schedule(5.second)(p.complete("hello").map(require(_))) _ <- task.cancel cancelled <- retry(task.isCancelled)