Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
fwbrasil committed Dec 8, 2023
1 parent b2d4ec5 commit 89aa066
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 45 deletions.
32 changes: 17 additions & 15 deletions kyo-core/shared/src/main/scala/kyo/concurrent/fibers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,9 @@ object fibers {
Locals.save.map(st => Fiber.promise(IOTask(IOs(v), st)))

/*inline*/
def init[T, S](j: Joins[S])( /*inline*/ v: => T > (S with Fibers))(implicit f: Flat[T > (S with Fibers)]): Fiber[j.M[T]] > (S with IOs) =
def init[T, S](j: Joins[S])( /*inline*/ v: => T > (S with Fibers))(implicit
f: Flat[T > (S with Fibers)]
): Fiber[j.M[T]] > (S with IOs) =
j.save.map { st =>
init(j.handle(st, v))
}
Expand Down Expand Up @@ -268,13 +270,6 @@ object fibers {
}
}

def parallel[T, S](j: Joins[S])(l: Seq[T > (S with Fibers)])(
implicit f: Flat[T > Fibers]
): Seq[T] > (S with Fibers) =
j.save.map { st =>
j.handle(st, l).map(parallel(_)).map(j.resume)
}

def parallel[T](l: Seq[T > Fibers])(implicit f: Flat[T > Fibers]): Seq[T] > Fibers =
l.size match {
case 0 => Seq.empty
Expand All @@ -283,6 +278,13 @@ object fibers {
Fibers.get(parallelFiber[T](l))
}

def parallel[T, S](j: Joins[S])(l: Seq[T > (S with Fibers)])(
implicit f: Flat[T > Fibers]
): Seq[T] > (S with Fibers) =
j.save.map { st =>
j.handle(st, l).map(parallel(_)).map(j.resume)
}

def parallelFiber[T](l: Seq[T > Fibers])(implicit f: Flat[T > Fibers]): Fiber[Seq[T]] > IOs =
l.size match {
case 0 => Fiber.done(Seq.empty)
Expand Down Expand Up @@ -317,13 +319,6 @@ object fibers {
}
}

def race[T, S](j: Joins[S])(l: Seq[T > (S with Fibers)])(implicit
f: Flat[T > (S with Fibers)]
): T > (S with Fibers) =
j.save.map { st =>
j.handle(st, l).map(race(_)).map(j.resume)
}

def race[T](l: Seq[T > Fibers])(implicit f: Flat[T > Fibers]): T > Fibers =
l.size match {
case 0 => IOs.fail("Can't race an empty list.")
Expand All @@ -332,6 +327,13 @@ object fibers {
Fibers.get(raceFiber[T](l))
}

def race[T, S](j: Joins[S])(l: Seq[T > (S with Fibers)])(implicit
f: Flat[T > (S with Fibers)]
): T > (S with Fibers) =
j.save.map { st =>
j.handle(st, l).map(race(_)).map(j.resume)
}

def raceFiber[T](l: Seq[T > Fibers])(implicit f: Flat[T > Fibers]): Fiber[T] > IOs =
l.size match {
case 0 => IOs.fail("Can't race an empty list.")
Expand Down
44 changes: 44 additions & 0 deletions kyo-core/shared/src/main/scala/kyo/joins.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package kyo
import kyo._
import kyo.core._
import kyo.ios._
import kyo.aborts._
import kyo.envs._
import kyo.lists.Lists
import izumi.reflect._
import kyo.resources.Resources

object joins {

Expand Down Expand Up @@ -74,5 +78,45 @@ object joins {
def apply[E1: Joins, E2: Joins, E3: Joins, E4: Joins, E5: Joins]
: Joins[E1 with E2 with E3 with E4 with E5] =
Joins[E1].andThen(Joins[E2]).andThen(Joins[E3]).andThen(Joins[E4]).andThen(Joins[E5])

implicit def aborts[E: Tag]: Joins[Aborts[E]] =
new Joins[Aborts[E]] {
type State = Unit
type M[T] = Abort[E]#Value[T]
val aborts = Aborts[E]

def save = ()
def handle[T, S](s: State, v: T > (Aborts[E] & S))(implicit f: Flat[T > (Aborts[E] & S)]) =
aborts.run(v)
def resume[T, S](v: M[T] > S) =
aborts.get(v)
}

implicit def envs[E: Tag]: Joins[Envs[E]] =
new Joins[Envs[E]] {
type State = E
type M[T] = T
val envs = Envs[E]

def save = envs.get
def handle[T, S](s: State, v: T > (Envs[E] & S))(implicit f: Flat[T > (Envs[E] & S)]) =
envs.run(s)(v)
def resume[T, S](v: M[T] > S) =
v
}

implicit val lists: Joins[Lists] =
new Joins[Lists] {
type State = Unit
type M[T] = List[T]

def save = ()
def handle[T, S](s: Unit, v: T > (Lists & S))(implicit f: Flat[T > (Lists & S)]) =
Lists.run(v)
def resume[T, S](v: List[T] > S): T > (Lists & S) =
Lists.foreach(v)
}


}
}
8 changes: 4 additions & 4 deletions kyo-direct/src/test/scala/kyoTest/directTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ class directTest extends KyoTest {
"lists" in {
import kyo.lists._

val x = Lists.foreach(List(1, -2, -3))
val y = Lists.foreach(List("ab", "cde"))
val x = Lists.get(List(1, -2, -3))
val y = Lists.get(List("ab", "cde"))

val v: Int > Lists =
defer {
Expand All @@ -172,8 +172,8 @@ class directTest extends KyoTest {
"lists + filter" in {
import kyo.lists._

val x = Lists.foreach(List(1, -2, -3))
val y = Lists.foreach(List("ab", "cde"))
val x = Lists.get(List(1, -2, -3))
val y = Lists.get(List("ab", "cde"))

val v: Int > Lists =
defer {
Expand Down
27 changes: 1 addition & 26 deletions kyo-llm/shared/src/main/scala/kyo/llm/ais.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import kyo.llm.completions._
import kyo.llm.configs._
import kyo.llm.contexts._
import kyo.llm.tools._
import kyo.concurrent.Joins
import kyo.concurrent.atomics._
import kyo.concurrent.fibers._
import kyo.ios._
Expand Down Expand Up @@ -155,7 +154,7 @@ object ais {
} yield r
}

object AIs extends Joins[AIs] {
object AIs {

type Effects = Sums[State] with Requests

Expand Down Expand Up @@ -220,30 +219,6 @@ object ais {
State.get.map { st =>
Tries.run[T, S](f).map(r => State.set(st).map(_ => r.get))
}

def race[T](l: Seq[T > AIs])(implicit f: Flat[T > AIs]): T > AIs =
State.get.map { st =>
Requests.race[(T, State)](l.map(State.run[T, Requests](st)))
.map {
case (v, st) =>
State.set(st).map(_ => v)
}
}

def parallel[T](l: Seq[T > AIs])(implicit f: Flat[T > AIs]): Seq[T] > AIs =
State.get.map { st =>
Requests.parallel[(T, State)](l.map(State.run[T, Requests](st)))
.map { rl =>
val r = rl.map(_._1)
val st =
rl.map(_._2)
.foldLeft(Map.empty: State) {
case (acc, st) =>
summer.add(acc, st)
}
State.set(st).map(_ => r)
}
}
}

object internal {
Expand Down

0 comments on commit 89aa066

Please sign in to comment.