diff --git a/kyo-core/shared/src/main/scala/kyo/concurrent/Joins.scala b/kyo-core/shared/src/main/scala/kyo/concurrent/Joins.scala index 2063d4a5e..6e89ead39 100644 --- a/kyo-core/shared/src/main/scala/kyo/concurrent/Joins.scala +++ b/kyo-core/shared/src/main/scala/kyo/concurrent/Joins.scala @@ -1,77 +1,77 @@ -package kyo.concurrent +// package kyo.concurrent -import kyo._ -import scala.annotation.implicitNotFound +// import kyo._ +// import scala.annotation.implicitNotFound -trait Joins[E] { +// trait Joins[E] { - def race[T](l: Seq[T < E])(implicit f: Flat[T < E]): T < E +// def race[T](l: Seq[T < E])(implicit f: Flat[T < E]): T < E - def parallel[T](l: Seq[T < E])(implicit f: Flat[T < E]): Seq[T] < E +// def parallel[T](l: Seq[T < E])(implicit f: Flat[T < E]): Seq[T] < E - def parallelTraverse[T, U](v: Seq[T] < E)(f: T => U < E)(implicit flat: Flat[U < E]): Seq[U] < E = - v.map(_.map(f)).map(parallel[U](_)) +// def parallelTraverse[T, U](v: Seq[T] < E)(f: T => U < E)(implicit flat: Flat[U < E]): Seq[U] < E = +// v.map(_.map(f)).map(parallel[U](_)) - def race[T]( - v1: => T < E, - v2: => T < E - )(implicit f: Flat[T < E]): T < E = - race(List(v1, v2)) +// def race[T]( +// v1: => T < E, +// v2: => T < E +// )(implicit f: Flat[T < E]): T < E = +// race(List(v1, v2)) - def race[T]( - v1: => T < E, - v2: => T < E, - v3: => T < E - )(implicit f: Flat[T < E]): T < E = - race(List(v1, v2, v3)) +// def race[T]( +// v1: => T < E, +// v2: => T < E, +// v3: => T < E +// )(implicit f: Flat[T < E]): T < E = +// race(List(v1, v2, v3)) - def race[T]( - v1: => T < E, - v2: => T < E, - v3: => T < E, - v4: => T < E - )(implicit f: Flat[T < E]): T < E = - race(List(v1, v2, v3, v4)) +// def race[T]( +// v1: => T < E, +// v2: => T < E, +// v3: => T < E, +// v4: => T < E +// )(implicit f: Flat[T < E]): T < E = +// race(List(v1, v2, v3, v4)) - def parallel[T1, T2]( - v1: => T1 < E, - v2: => T2 < E - )( - implicit - f1: Flat[T1 < E], - f2: Flat[T2 < E] - ): (T1, T2) < E = - parallel(List(v1, v2))(Flat.unsafe.checked).map(s => - (s(0).asInstanceOf[T1], s(1).asInstanceOf[T2]) - ) +// def parallel[T1, T2]( +// v1: => T1 < E, +// v2: => T2 < E +// )( +// implicit +// f1: Flat[T1 < E], +// f2: Flat[T2 < E] +// ): (T1, T2) < E = +// parallel(List(v1, v2))(Flat.unsafe.checked).map(s => +// (s(0).asInstanceOf[T1], s(1).asInstanceOf[T2]) +// ) - def parallel[T1, T2, T3]( - v1: => T1 < E, - v2: => T2 < E, - v3: => T3 < E - )( - implicit - f1: Flat[T1 < E], - f2: Flat[T2 < E], - f3: Flat[T3 < E] - ): (T1, T2, T3) < E = - parallel(List(v1, v2, v3))(Flat.unsafe.checked).map(s => - (s(0).asInstanceOf[T1], s(1).asInstanceOf[T2], s(2).asInstanceOf[T3]) - ) +// def parallel[T1, T2, T3]( +// v1: => T1 < E, +// v2: => T2 < E, +// v3: => T3 < E +// )( +// implicit +// f1: Flat[T1 < E], +// f2: Flat[T2 < E], +// f3: Flat[T3 < E] +// ): (T1, T2, T3) < E = +// parallel(List(v1, v2, v3))(Flat.unsafe.checked).map(s => +// (s(0).asInstanceOf[T1], s(1).asInstanceOf[T2], s(2).asInstanceOf[T3]) +// ) - def parallel[T1, T2, T3, T4]( - v1: => T1 < E, - v2: => T2 < E, - v3: => T3 < E, - v4: => T4 < E - )( - implicit - f1: Flat[T1 < E], - f2: Flat[T2 < E], - f3: Flat[T3 < E], - f4: Flat[T4 < E] - ): (T1, T2, T3, T4) < E = - parallel(List(v1, v2, v3, v4))(Flat.unsafe.checked).map(s => - (s(0).asInstanceOf[T1], s(1).asInstanceOf[T2], s(2).asInstanceOf[T3], s(3).asInstanceOf[T4]) - ) -} +// def parallel[T1, T2, T3, T4]( +// v1: => T1 < E, +// v2: => T2 < E, +// v3: => T3 < E, +// v4: => T4 < E +// )( +// implicit +// f1: Flat[T1 < E], +// f2: Flat[T2 < E], +// f3: Flat[T3 < E], +// f4: Flat[T4 < E] +// ): (T1, T2, T3, T4) < E = +// parallel(List(v1, v2, v3, v4))(Flat.unsafe.checked).map(s => +// (s(0).asInstanceOf[T1], s(1).asInstanceOf[T2], s(2).asInstanceOf[T3], s(3).asInstanceOf[T4]) +// ) +// } diff --git a/kyo-core/shared/src/main/scala/kyo/concurrent/fibers.scala b/kyo-core/shared/src/main/scala/kyo/concurrent/fibers.scala index 7a0538fd9..3c6c3be96 100644 --- a/kyo-core/shared/src/main/scala/kyo/concurrent/fibers.scala +++ b/kyo-core/shared/src/main/scala/kyo/concurrent/fibers.scala @@ -4,6 +4,7 @@ import kyo._ import kyo.core._ import kyo.core.internal._ import kyo.ios._ +import kyo.joins._ import kyo.locals._ import java.util.concurrent.atomic.AtomicInteger @@ -164,7 +165,7 @@ object fibers { type Fibers >: Fibers.Effects <: Fibers.Effects - object Fibers extends Joins[Fibers] { + object Fibers { type Effects = FiberGets with IOs @@ -203,6 +204,64 @@ object fibers { def init[T]( /*inline*/ v: => T < Fibers)(implicit f: Flat[T < Fibers]): Fiber[T] < IOs = Locals.save.map(st => Fiber.promise(IOTask(IOs(v), st))) + def never: Fiber[Unit] < IOs = + IOs(Fiber.promise(new IOPromise[Unit])) + + def delay[T, S](d: Duration)(v: => T < S): T < (S with Fibers) = + sleep(d).andThen(v) + + def sleep(d: Duration): Unit < Fibers = + initPromise[Unit].map { p => + if (d.isFinite) { + val run: Unit < IOs = + IOs { + IOTask(IOs(p.complete(())), Locals.State.empty) + () + } + Timers.schedule(d)(run).map { t => + IOs.ensure(t.cancel.unit)(p.get) + } + } else { + p.get + } + } + + def timeout[T](d: Duration)(v: => T < Fibers)(implicit f: Flat[T < Fibers]): T < Fibers = + init(v).map { f => + val timeout: Unit < IOs = + IOs { + IOTask(IOs(f.interrupt), Locals.State.empty) + () + } + Timers.schedule(d)(timeout).map { t => + IOs.ensure(t.cancel.unit)(f.get) + } + } + + def fromFuture[T, S](f: Future[T]): T < Fibers = + Fibers.get(fromFutureFiber(f)) + + def fromFutureFiber[T](f: Future[T]): Fiber[T] < IOs = { + Locals.save.map { st => + IOs { + val p = new IOPromise[T]() + f.onComplete { r => + val io = + IOs[Boolean, IOs] { + r match { + case Success(v) => + p.complete(v) + case Failure(ex) => + p.complete(IOs.fail(ex)) + } + } + IOTask(io, st) + }(ExecutionContext.parasitic) + Fiber.promise(p) + } + } + } + def parallel[T](l: Seq[T < Fibers])(implicit f: Flat[T < Fibers]): Seq[T] < Fibers = l.size match { case 0 => Seq.empty @@ -211,6 +270,13 @@ object fibers { Fibers.get(parallelFiber[T](l)) } + def parallel[T, S](j: Joins[S])(l: Seq[T < (S with Fibers)])( + implicit f: Flat[T < Fibers] + ): Seq[T] < (S with Fibers) = + j.save.map { st => + j.handle(st, l).map(parallel(_)).map(j.resume) + } + def parallelFiber[T](l: Seq[T < Fibers])(implicit f: Flat[T < Fibers]): Fiber[Seq[T]] < IOs = l.size match { case 0 => Fiber.done(Seq.empty) @@ -253,6 +319,13 @@ object fibers { Fibers.get(raceFiber[T](l)) } + def race[T, S](j: Joins[S])(l: Seq[T < (S with Fibers)])(implicit + f: Flat[T < (S with Fibers)] + ): T < (S with Fibers) = + j.save.map { st => + j.handle(st, l).map(race(_)).map(j.resume) + } + def raceFiber[T](l: Seq[T < Fibers])(implicit f: Flat[T < Fibers]): Fiber[T] < IOs = l.size match { case 0 => IOs.fail("Can't race an empty list.") @@ -271,63 +344,130 @@ object fibers { } } - def never: Fiber[Unit] < IOs = - IOs(Fiber.promise(new IOPromise[Unit])) - - def delay[T, S](d: Duration)(v: => T < S): T < (S with Fibers) = - sleep(d).andThen(v) - - def sleep(d: Duration): Unit < Fibers = - initPromise[Unit].map { p => - if (d.isFinite) { - val run: Unit < IOs = - IOs { - IOTask(IOs(p.complete(())), Locals.State.empty) - () - } - Timers.schedule(d)(run).map { t => - IOs.ensure(t.cancel.unit)(p.get) - } - } else { - p.get - } - } - - def timeout[T](d: Duration)(v: => T < Fibers)(implicit f: Flat[T < Fibers]): T < Fibers = - init(v).map { f => - val timeout: Unit < IOs = - IOs { - IOTask(IOs(f.interrupt), Locals.State.empty) - () - } - Timers.schedule(d)(timeout).map { t => - IOs.ensure(t.cancel.unit)(f.get) - } - } - - def fromFuture[T, S](f: Future[T]): T < Fibers = - Fibers.get(fromFutureFiber(f)) - - def fromFutureFiber[T](f: Future[T]): Fiber[T] < IOs = { - Locals.save.map { st => - IOs { - val p = new IOPromise[T]() - f.onComplete { r => - val io = - IOs[Boolean, IOs] { - r match { - case Success(v) => - p.complete(v) - case Failure(ex) => - p.complete(IOs.fail(ex)) - } - } - IOTask(io, st) - }(ExecutionContext.parasitic) - Fiber.promise(p) - } - } - } + def parallelTraverse[T, U](v: Seq[T] < Fibers)(f: T => U < Fibers)(implicit + flat: Flat[U < Fibers] + ): Seq[U] < Fibers = + v.map(_.map(f)).map(parallel[U](_)) + + def race[T]( + v1: => T < Fibers, + v2: => T < Fibers + )(implicit f: Flat[T < Fibers]): T < Fibers = + race(List(v1, v2)) + + def race[T]( + v1: => T < Fibers, + v2: => T < Fibers, + v3: => T < Fibers + )(implicit f: Flat[T < Fibers]): T < Fibers = + race(List(v1, v2, v3)) + + def race[T]( + v1: => T < Fibers, + v2: => T < Fibers, + v3: => T < Fibers, + v4: => T < Fibers + )(implicit f: Flat[T < Fibers]): T < Fibers = + race(List(v1, v2, v3, v4)) + + def race[T, S](j: Joins[S])( + v1: => T < (S with Fibers), + v2: => T < (S with Fibers) + )(implicit f: Flat[T < (S with Fibers)]): T < (S with Fibers) = + race(j)(List(v1, v2)) + + def race[T, S](j: Joins[S])( + v1: => T < (S with Fibers), + v2: => T < (S with Fibers), + v3: => T < (S with Fibers) + )(implicit f: Flat[T < (S with Fibers)]): T < (S with Fibers) = + race(j)(List(v1, v2, v3)) + + def race[T, S](j: Joins[S])( + v1: => T < (S with Fibers), + v2: => T < (S with Fibers), + v3: => T < (S with Fibers), + v4: => T < (S with Fibers) + )(implicit f: Flat[T < (S with Fibers)]): T < (S with Fibers) = + race(j)(List(v1, v2, v3, v4)) + + def parallel[T1, T2]( + v1: => T1 < Fibers, + v2: => T2 < Fibers + )(implicit + f1: Flat[T1 < Fibers], + f2: Flat[T2 < Fibers] + ): (T1, T2) < Fibers = + parallel(List(v1, v2))(Flat.unsafe.checked).map(s => + (s(0).asInstanceOf[T1], s(1).asInstanceOf[T2]) + ) + + def parallel[T1, T2, T3]( + v1: => T1 < Fibers, + v2: => T2 < Fibers, + v3: => T3 < Fibers + )(implicit + f1: Flat[T1 < Fibers], + f2: Flat[T2 < Fibers], + f3: Flat[T3 < Fibers] + ): (T1, T2, T3) < Fibers = + parallel(List(v1, v2, v3))(Flat.unsafe.checked).map(s => + (s(0).asInstanceOf[T1], s(1).asInstanceOf[T2], s(2).asInstanceOf[T3]) + ) + + def parallel[T1, T2, T3, T4]( + v1: => T1 < Fibers, + v2: => T2 < Fibers, + v3: => T3 < Fibers, + v4: => T4 < Fibers + )(implicit + f1: Flat[T1 < Fibers], + f2: Flat[T2 < Fibers], + f3: Flat[T3 < Fibers], + f4: Flat[T4 < Fibers] + ): (T1, T2, T3, T4) < Fibers = + parallel(List(v1, v2, v3, v4))(Flat.unsafe.checked).map(s => + (s(0).asInstanceOf[T1], s(1).asInstanceOf[T2], s(2).asInstanceOf[T3], s(3).asInstanceOf[T4]) + ) + + def parallel[T1, T2, S](j: Joins[S])( + v1: => T1 < (S with Fibers), + v2: => T2 < (S with Fibers) + )(implicit + f1: Flat[T1 < (S with Fibers)], + f2: Flat[T2 < (S with Fibers)] + ): (T1, T2) < (S with Fibers) = + parallel(j)(List(v1, v2))(Flat.unsafe.checked).map(s => + (s(0).asInstanceOf[T1], s(1).asInstanceOf[T2]) + ) + + def parallel[T1, T2, T3, S](j: Joins[S])( + v1: => T1 < (S with Fibers), + v2: => T2 < (S with Fibers), + v3: => T3 < (S with Fibers) + )(implicit + f1: Flat[T1 < (S with Fibers)], + f2: Flat[T2 < (S with Fibers)], + f3: Flat[T3 < (S with Fibers)] + ): (T1, T2, T3) < (S with Fibers) = + parallel(j)(List(v1, v2, v3))(Flat.unsafe.checked).map(s => + (s(0).asInstanceOf[T1], s(1).asInstanceOf[T2], s(2).asInstanceOf[T3]) + ) + + def parallel[T1, T2, T3, T4, S](j: Joins[S])( + v1: => T1 < (S with Fibers), + v2: => T2 < (S with Fibers), + v3: => T3 < (S with Fibers), + v4: => T4 < (S with Fibers) + )(implicit + f1: Flat[T1 < (S with Fibers)], + f2: Flat[T2 < (S with Fibers)], + f3: Flat[T3 < (S with Fibers)], + f4: Flat[T4 < (S with Fibers)] + ): (T1, T2, T3, T4) < (S with Fibers) = + parallel(List(v1, v2, v3, v4))(Flat.unsafe.checked).map(s => + (s(0).asInstanceOf[T1], s(1).asInstanceOf[T2], s(2).asInstanceOf[T3], s(3).asInstanceOf[T4]) + ) private def foreach[T, U](l: Seq[T])(f: T => Unit): Unit = { val it = l.iterator diff --git a/kyo-core/shared/src/main/scala/kyo/joins.scala b/kyo-core/shared/src/main/scala/kyo/joins.scala new file mode 100644 index 000000000..4455af821 --- /dev/null +++ b/kyo-core/shared/src/main/scala/kyo/joins.scala @@ -0,0 +1,121 @@ +package kyo + +import kyo._ +import kyo.core._ +import kyo.ios._ +import kyo.aborts._ +import kyo.envs._ +import kyo.seqs._ +import izumi.reflect._ +import kyo.resources.Resources + +object joins { + + trait Joins[E] { self => + + type M[_] + type State + + implicit def flat[T]: Flat[M[T]] = Flat.unsafe.unchecked + + def save: State < E + + def handle[T, S]( + s: State, + v: T < (E with S) + )(implicit + f: Flat[T < (E with S)] + ): M[T] < (S with IOs) + + def resume[T, S](v: M[T] < S): T < (E with S) + + def handle[T, S]( + s: State, + l: Seq[T < (E with S)] + )(implicit + f: Flat[T < (E with S)] + ): Seq[M[T]] < (S with IOs) = + Seqs.traverse(l.toList)(handle(s, _)) + + def resume[T, S](l: Seq[M[T] < S]): Seq[T] < (E with S with IOs) = + Seqs.collect(l.toList).map { ls => + Seqs.traverse(ls)(resume) + } + + def andThen[E2](j: Joins[E2]): Joins[E with E2] = + new Joins[E with E2] { + type M[T] = j.M[self.M[T]] + type State = (j.State, self.State) + + def save = zip(j.save, self.save) + + def handle[T, S]( + s: State, + v: T < (E & E2 & S) + )(implicit + f: Flat[T < (E & E2 & S)] + ) = + j.handle(s._1, self.handle(s._2, v)) + + def resume[T, S](v: M[T] < S) = + self.resume(j.resume(v)) + } + } + + object Joins { + def apply[E1: Joins]: Joins[E1] = + implicitly[Joins[E1]] + + def apply[E1: Joins, E2: Joins]: Joins[E1 with E2] = + Joins[E1].andThen(Joins[E2]) + + def apply[E1: Joins, E2: Joins, E3: Joins]: Joins[E1 with E2 with E3] = + Joins[E1].andThen(Joins[E2]).andThen(Joins[E3]) + + def apply[E1: Joins, E2: Joins, E3: Joins, E4: Joins]: Joins[E1 with E2 with E3 with E4] = + Joins[E1].andThen(Joins[E2]).andThen(Joins[E3]).andThen(Joins[E4]) + + def apply[E1: Joins, E2: Joins, E3: Joins, E4: Joins, E5: Joins] + : Joins[E1 with E2 with E3 with E4 with E5] = + Joins[E1].andThen(Joins[E2]).andThen(Joins[E3]).andThen(Joins[E4]).andThen(Joins[E5]) + + implicit def aborts[E: Tag]: Joins[Aborts[E]] = + new Joins[Aborts[E]] { + type State = Unit + type M[T] = Abort[E]#Value[T] + val aborts = Aborts[E] + + def save = () + def handle[T, S](s: State, v: T < (Aborts[E] & S))(implicit f: Flat[T < (Aborts[E] & S)]) = + aborts.run(v) + def resume[T, S](v: M[T] < S) = + aborts.get(v) + } + + implicit def envs[E: Tag]: Joins[Envs[E]] = + new Joins[Envs[E]] { + type State = E + type M[T] = T + val envs = Envs[E] + + def save = envs.get + def handle[T, S](s: State, v: T < (Envs[E] & S))(implicit f: Flat[T < (Envs[E] & S)]) = + envs.run(s)(v) + def resume[T, S](v: M[T] < S) = + v + } + + implicit val lists: Joins[Seqs] = + new Joins[Seqs] { + type State = Unit + type M[T] = Seq[T] + + def save = () + def handle[T, S](s: Unit, v: T < (Seqs & S))(implicit f: Flat[T < (Seqs & S)]) = + Seqs.run(v) + def resume[T, S](v: Seq[T] < S): T < (Seqs & S) = + Seqs.get(v) + } + + } +} diff --git a/kyo-llm/shared/src/main/scala/kyo/llm/ais.scala b/kyo-llm/shared/src/main/scala/kyo/llm/ais.scala index 74d1898a7..e1edf0b54 100644 --- a/kyo-llm/shared/src/main/scala/kyo/llm/ais.scala +++ b/kyo-llm/shared/src/main/scala/kyo/llm/ais.scala @@ -5,7 +5,6 @@ import kyo.llm.completions._ import kyo.llm.configs._ import kyo.llm.contexts._ import kyo.llm.agents._ -import kyo.concurrent.Joins import kyo.concurrent.atomics._ import kyo.concurrent.fibers._ import kyo.ios._ @@ -167,7 +166,7 @@ object ais { } yield r } - object AIs extends Joins[AIs] { + object AIs { type Effects = Sums[State] with Requests @@ -248,30 +247,6 @@ object ais { State.get.map { st => Tries.run[T, S](f).map(r => State.set(st).map(_ => r.get)) } - - def race[T](l: Seq[T < AIs])(implicit f: Flat[T < AIs]): T < AIs = - State.get.map { st => - Requests.race[(T, State)](l.map(State.run[T, Requests](st))) - .map { - case (v, st) => - State.set(st).map(_ => v) - } - } - - def parallel[T](l: Seq[T < AIs])(implicit f: Flat[T < AIs]): Seq[T] < AIs = - State.get.map { st => - Requests.parallel[(T, State)](l.map(State.run[T, Requests](st))) - .map { rl => - val r = rl.map(_._1) - val st = - rl.map(_._2) - .foldLeft(Map.empty: State) { - case (acc, st) => - summer.add(acc, st) - } - State.set(st).map(_ => r) - } - } } object internal { diff --git a/kyo-sttp/shared/src/main/scala/kyo/requests.scala b/kyo-sttp/shared/src/main/scala/kyo/requests.scala index f28dd726b..3ac10652d 100644 --- a/kyo-sttp/shared/src/main/scala/kyo/requests.scala +++ b/kyo-sttp/shared/src/main/scala/kyo/requests.scala @@ -1,7 +1,6 @@ package kyo import kyo._ -import kyo.concurrent.Joins import kyo.concurrent.fibers._ import kyo.envs._ import kyo.ios._ @@ -19,7 +18,7 @@ object requests { type Requests >: Requests.Effects <: Requests.Effects - object Requests extends Joins[Requests] { + object Requests { type Effects = Envs[Backend] with Fibers @@ -60,15 +59,5 @@ object requests { value } } - - def race[T](l: Seq[T < Requests])(implicit f: Flat[T < Requests]): T < Requests = - envs.get.map { b => - Fibers.race(l.map(Requests.run(b)(_))) - } - - def parallel[T](l: Seq[T < Requests])(implicit f: Flat[T < Requests]): Seq[T] < Requests = - envs.get.map { b => - Fibers.parallel(l.map(Requests.run(b)(_))) - } } } diff --git a/kyo-sttp/shared/src/test/scala/kyoTest/requestsTest.scala b/kyo-sttp/shared/src/test/scala/kyoTest/requestsTest.scala index 5419b56e2..cd1a2883c 100644 --- a/kyo-sttp/shared/src/test/scala/kyoTest/requestsTest.scala +++ b/kyo-sttp/shared/src/test/scala/kyoTest/requestsTest.scala @@ -42,17 +42,4 @@ class requestsTest extends KyoTest { } } } - "race" in run { - val backend = new TestBackend - Requests.run(backend) { - val call = Requests.request[String](Requests.basicRequest.get(uri"https://httpbin.org/get")) - for { - r <- Requests.race(call, call) - } yield { - assert(r == "mocked") - assert(backend.calls == 2) - } - } - } - }