Skip to content

Commit

Permalink
[data][prelude][core] Poll effect + handleFirst (#891)
Browse files Browse the repository at this point in the history
Introduces a new `Poll` effect that functions as the opposite of `Emit`.
I've also introduced `ArrowEffect.handleFirst` and `Emit.handleFirst`,
which might be useful for the work on Stream. I'm planning to use `Poll`
for an `Actor` implementation.

---------

Co-authored-by: Adam Hearn <[email protected]>
  • Loading branch information
fwbrasil and hearnadam authored Dec 5, 2024
1 parent 5ed25d4 commit 7baf292
Show file tree
Hide file tree
Showing 14 changed files with 949 additions and 114 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,7 @@ The `Emit` effect is designed to accumulate values throughout a computation, sim
import kyo.*

// Add a value
val a: Emit.Ack < Emit[Int] =
val a: Ack < Emit[Int] =
Emit(42)

// Add multiple values
Expand Down
6 changes: 3 additions & 3 deletions kyo-core/jvm/src/main/scala/kyo/Path.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,11 @@ class Path private (val path: List[String]) derives CanEqual:
Resource.acquireRelease(acquire)(release).map { res =>
readOnce(res).map { state =>
Loop(state) {
case Absent => Loop.done(Emit.Ack.Stop)
case Absent => Loop.done(Ack.Stop)
case Present(content) =>
Emit.andMap(writeOnce(content)) {
case Emit.Ack.Stop => Loop.done(Emit.Ack.Stop)
case _ => readOnce(res).map(Loop.continue(_))
case Ack.Stop => Loop.done(Ack.Stop)
case _ => readOnce(res).map(Loop.continue(_))
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions kyo-data/shared/src/main/scala/kyo/Chunk.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ sealed abstract class Chunk[A] extends Seq[A] derives CanEqual:
final def append(v: A): Chunk[A] =
Append(this, v, length + 1)

/** Returns the first element of the Chunk wrapped in a Maybe.
*
* @return
* Maybe containing the first element if the Chunk is non-empty, or Maybe.empty if the Chunk is empty
*/
final def headMaybe: Maybe[A] =
Maybe.when(nonEmpty)(head)

/** Returns the last element of the Chunk.
*
* @return
Expand Down
22 changes: 22 additions & 0 deletions kyo-data/shared/src/test/scala/kyo/ChunkTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,28 @@ class ChunkTest extends Test:
}
}

"headMaybe" - {
"returns Present with the first element for a non-empty chunk" in {
val chunk = Chunk(1, 2, 3)
assert(chunk.headMaybe == Maybe(1))
}

"returns Absent for an empty chunk" in {
val chunk = Chunk.empty[Int]
assert(chunk.headMaybe.isEmpty)
}

"returns Present with the first element after appending" in {
val chunk = Chunk.empty[Int].append(1).append(2)
assert(chunk.headMaybe == Maybe(1))
}

"returns Present with the first element after dropping elements" in {
val chunk = Chunk(1, 2, 3, 4).dropLeft(2)
assert(chunk.headMaybe == Maybe(3))
}
}

"tail" - {
"returns the tail of a non-empty chunk" in {
val chunk = Chunk(1, 2, 3).toIndexed
Expand Down
66 changes: 66 additions & 0 deletions kyo-prelude/shared/src/main/scala/kyo/Ack.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package kyo

/** An acknowledgement type used to control emission of values.
*
* It is either a [[Continue]] with a positive maximum number of values to emit, or [[Stop]] to indicate that no more values should be
* emitted.
*/
opaque type Ack = Int
object Ack:
given CanEqual[Ack, Ack] = CanEqual.derived
inline given Flat[Ack] = Flat.unsafe.bypass
inline given Flat[Ack.Stop.type] = Flat.unsafe.bypass
inline given Flat[Ack.Continue] = Flat.unsafe.bypass

/** Creates an [[Ack]] from a maximum number of values to emit.
*
* @param maxValues
* The mamximum number of values to emit
* @return
* [[Continue]] if the maximum number of values is positive, [[Stop]] otherwise
*/
def apply(maxValues: Int): Ack = Math.max(0, maxValues)

extension (self: Ack)
/** Limits the acknowledgement to a maximum number of values.
*
* If this acknowledgement is [[Stop]] or `n` is non-positive then the returned acknowledgement is [[Stop]]. Otherwise, if this
* acknowledgement is [[Continue]] then the returned acknowledgement is [[Continue]] with the minimum of the current maximum number
* of values and `n`.
*
* @param n
* The maximum number of values to emit
* @return
* [[Continue]] if the minimum of the current maximum number of values and `n` is positive, [[Stop]] otherwise
*/
def maxValues(n: Int): Ack = Ack(Math.min(self, n))

/** Chains acknowledgements by executing a function only if not stopped.
*
* If the current acknowledgement is [[Stop]], returns [[Stop]] immediately. Otherwise, executes the provided function to get the
* next acknowledgement.
*
* @param f
* The function to execute to get the next acknowledgement
* @return
* [[Stop]] if current acknowledgement is [[Stop]], otherwise the result of `f`
*/
inline def next[S](inline f: => Ack < S): Ack < S =
if stop then Stop
else f

// Workaround for compiler issue with inlined `next`
private def stop: Boolean = self == Stop
end extension

/** Indicates to continue emitting values */
opaque type Continue <: Ack = Int
object Continue:
def apply(): Continue = Int.MaxValue

def unapply(ack: Ack): Maybe.Ops[Int] = Maybe.when(ack > 0)(ack)
end Continue

/** Indicates to stop emitting values */
val Stop: Ack = 0
end Ack
94 changes: 28 additions & 66 deletions kyo-prelude/shared/src/main/scala/kyo/Emit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,75 +11,10 @@ import kyo.kernel.*
* @tparam V
* The type of values that can be emitted
*/
sealed trait Emit[V] extends ArrowEffect[Const[V], Const[Emit.Ack]]
sealed trait Emit[V] extends ArrowEffect[Const[V], Const[Ack]]

object Emit:

/** An acknowledgement type used to control emission of values.
*
* It is either a [[Continue]] with a positive maximum number of values to emit, or [[Stop]] to indicate that no more values should be
* emitted.
*/
opaque type Ack = Int
object Ack:
given CanEqual[Ack, Ack] = CanEqual.derived
inline given Flat[Ack] = Flat.unsafe.bypass
inline given Flat[Ack.Stop.type] = Flat.unsafe.bypass
inline given Flat[Ack.Continue] = Flat.unsafe.bypass

/** Creates an [[Ack]] from a maximum number of values to emit.
*
* @param maxValues
* The mamximum number of values to emit
* @return
* [[Continue]] if the maximum number of values is positive, [[Stop]] otherwise
*/
def apply(maxValues: Int): Ack = Math.max(0, maxValues)

extension (self: Ack)
/** Limits the acknowledgement to a maximum number of values.
*
* If this acknowledgement is [[Stop]] or `n` is non-positive then the returned acknowledgement is [[Stop]]. Otherwise, if this
* acknowledgement is [[Continue]] then the returned acknowledgement is [[Continue]] with the minimum of the current maximum
* number of values and `n`.
*
* @param n
* The maximum number of values to emit
* @return
* [[Continue]] if the minimum of the current maximum number of values and `n` is positive, [[Stop]] otherwise
*/
def maxValues(n: Int): Ack = Ack(Math.min(self, n))

/** Chains acknowledgements by executing a function only if not stopped.
*
* If the current acknowledgement is [[Stop]], returns [[Stop]] immediately. Otherwise, executes the provided function to get
* the next acknowledgement.
*
* @param f
* The function to execute to get the next acknowledgement
* @return
* [[Stop]] if current acknowledgement is [[Stop]], otherwise the result of `f`
*/
inline def next[S](inline f: => Ack < S): Ack < S =
if stop then Stop
else f

// Workaround for compiler issue with inlined `next`
private def stop: Boolean = self == Stop
end extension

/** Indicates to continue emitting values */
opaque type Continue <: Ack = Int
object Continue:
def apply(): Continue = Int.MaxValue

def unapply(ack: Ack): Maybe.Ops[Int] = Maybe.when(ack > 0)(ack)
end Continue

/** Indicates to stop emitting values */
val Stop: Ack = 0
end Ack

/** Emits a single value.
*
* @param value
Expand Down Expand Up @@ -185,6 +120,33 @@ object Emit:

inline def runAck[V >: Nothing]: RunAckOps[V] = RunAckOps(())

final class RunFirstOps[V](dummy: Unit) extends AnyVal:

/** Runs an Emit effect, capturing only the first emitted value and returning a continuation.
*
* @param v
* The computation with Emit effect
* @return
* A tuple containing:
* - Maybe[V]: The first emitted value if any (None if no values were emitted)
* - A continuation function that takes an Ack and returns the remaining computation
*/
def apply[A: Flat, S](v: A < (Emit[V] & S))(using tag: Tag[Emit[V]], frame: Frame): (Maybe[V], Ack => A < (Emit[V] & S)) < S =
ArrowEffect.handleFirst(tag, v)(
handle = [C] =>
(input, cont) =>
// Effect found, return the input an continuation
(Maybe(input), cont),
done = r =>
// Effect not found, return empty input and a placeholder continuation
// that returns the result of the computation
(Maybe.empty[V], (_: Ack) => r: A < (Emit[V] & S))
)
end apply
end RunFirstOps

inline def runFirst[V >: Nothing]: RunFirstOps[V] = RunFirstOps(())

object isolate:

/** Creates an isolate that includes emitted values from isolated computations.
Expand Down
1 change: 0 additions & 1 deletion kyo-prelude/shared/src/main/scala/kyo/Parse.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kyo

import kyo.Ansi.*
import kyo.Emit.Ack
import kyo.kernel.*

/** The Parse effect combines three fundamental capabilities needed for parsing:
Expand Down
Loading

0 comments on commit 7baf292

Please sign in to comment.