Skip to content

Commit

Permalink
fiber.block timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
fwbrasil committed Feb 27, 2024
1 parent 40d781d commit e812906
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 25 deletions.
3 changes: 2 additions & 1 deletion kyo-bench/src/main/scala/kyo/bench/Bench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import cats.effect.IO
import cats.effect.unsafe.implicits.global
import kyo.*
import org.openjdk.jmh.annotations.*
import scala.concurrent.duration.Duration
import zio.UIO

@State(Scope.Benchmark)
Expand Down Expand Up @@ -41,7 +42,7 @@ object Bench:

abstract class Fork[T](using f: Flat[T]) extends Bench[T]:
@Benchmark
def forkKyo(): T = IOs.run(Fibers.init(kyoBenchFiber()).flatMap(_.block))
def forkKyo(): T = IOs.run(Fibers.init(kyoBenchFiber()).flatMap(_.block(Duration.Inf)))

@Benchmark
def forkCats(): T = IO.cede.flatMap(_ => catsBench()).unsafeRunSync()
Expand Down
5 changes: 3 additions & 2 deletions kyo-core/js/src/main/scala/kyo/fibersSubs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package java.util.concurrent.locks
object LockSupport:
private def fail =
throw new UnsupportedOperationException("fiber.block is not supported in ScalaJS")
def park(): Unit = fail
def unpark(t: Thread): Unit = fail
def park(): Unit = fail
def parkNanos(o: Object, l: Long) = fail
def unpark(t: Thread): Unit = fail
end LockSupport
3 changes: 2 additions & 1 deletion kyo-core/jvm/src/test/scala/kyoTest/MonadLawsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kyoTest

import kyo.*
import kyo.Flat.unsafe.bypass
import scala.concurrent.duration.Duration
import zio.Trace
import zio.prelude.Equal
import zio.prelude.coherent.CovariantDeriveEqual
Expand Down Expand Up @@ -36,7 +37,7 @@ object MonadLawsTest extends ZIOSpecDefault:
override def derive[A: Equal]: Equal[Myo[A]] =
new Equal[Myo[A]]:
protected def checkEqual(l: Myo[A], r: Myo[A]): Boolean =
def run(m: Myo[A]): A = IOs.run(Fibers.runAndBlock(m.v))
def run(m: Myo[A]): A = IOs.run(Fibers.runAndBlock(Duration.Inf)(m.v))
run(l) == run(r)

def spec = suite("MonadLawsTest")(
Expand Down
2 changes: 1 addition & 1 deletion kyo-core/shared/src/main/scala/kyo/KyoApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object KyoApp:
def run[T](timeout: Duration)(v: T < Effects)(
using f: Flat[T < Effects]
): T =
IOs.run(runFiber(timeout)(v).block.map(_.get))
IOs.run(runFiber(timeout)(v).block(timeout).map(_.get))

def run[T](v: T < Effects)(
using f: Flat[T < Effects]
Expand Down
21 changes: 12 additions & 9 deletions kyo-core/shared/src/main/scala/kyo/fibers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ sealed abstract class Fiber[+T]:
def get: T < Fibers
def getTry: Try[T] < Fibers
def onComplete(f: T < IOs => Unit < IOs): Unit < IOs
def block: T < IOs
def block(timeout: Duration): T < IOs
def interrupt: Boolean < IOs
def toFuture: Future[T] < IOs
def transform[U: Flat](t: T => Fiber[U] < IOs): Fiber[U] < IOs
Expand All @@ -43,8 +43,8 @@ case class Promise[T: Flat](private[kyo] val p: IOPromise[T]) extends Fiber[T]:
def onComplete(f: T < IOs => Unit < IOs) =
IOs(p.onComplete(r => IOs.run(f(r))))

def block =
p.block
def block(timeout: Duration) =
p.block(timeout)

def interrupt =
IOs(p.interrupt())
Expand Down Expand Up @@ -97,14 +97,17 @@ object Fibers extends Joins[Fibers]:
def run[T](v: T < Fibers)(using f: Flat[T < Fibers]): Fiber[T] < IOs =
FiberGets.run(v)

def runAndBlock[T, S](v: T < (Fibers & S))(implicit
def runAndBlock[T, S](timeout: Duration)(v: T < (Fibers & S))(implicit
f: Flat[T < (Fibers & S)]
): T < (IOs & S) =
FiberGets.runAndBlock[T, S](v)
FiberGets.runAndBlock(timeout)(v)

def value[T: Flat](v: T): Fiber[T] =
Done(v)

def fail[T: Flat](ex: Throwable): Fiber[T] =
Done(IOs.fail(ex))

def get[T, S](v: Fiber[T] < S): T < (Fibers & S) =
v.map(_.get)

Expand Down Expand Up @@ -253,7 +256,7 @@ object fibersInternal:
def get = result
def getTry = IOs.attempt(result)
def onComplete(f: T < IOs => Unit < IOs) = f(result)
def block = result
def block(timeout: Duration) = result
def interrupt = false

def toFuture = Future.fromTry(Try(IOs.run(result)))
Expand All @@ -276,7 +279,7 @@ object fibersInternal:
IOs(deepHandle[Fiber, FiberGets, T, IOs](FiberGets)(IOs.runLazy(v)))
end run

def runAndBlock[T, S](v: T < (Fibers & S))(implicit
def runAndBlock[T, S](timeout: Duration)(v: T < (Fibers & S))(implicit
f: Flat[T < (Fibers & S)]
): T < (IOs & S) =
given Handler[Fiber, FiberGets, IOs] =
Expand All @@ -288,13 +291,13 @@ object fibersInternal:
try
m match
case m: Promise[T] @unchecked =>
m.block.map(f)
m.block(timeout).map(f)
case Done(v) =>
v.map(f)
catch
case ex if (NonFatal(ex)) =>
handle(ex)
IOs(handle[T, IOs & S, IOs](v).map(_.block))
IOs(handle[T, IOs & S, IOs](v).map(_.block(timeout)))
end runAndBlock
end FiberGets
val FiberGets = new FiberGets
Expand Down
16 changes: 12 additions & 4 deletions kyo-core/shared/src/main/scala/kyo/scheduler/IOPromise.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.LockSupport
import kyo.*
import scala.annotation.tailrec
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

private[kyo] class IOPromise[T](state: State[T])
Expand Down Expand Up @@ -113,12 +114,15 @@ private[kyo] class IOPromise[T](state: State[T])
loop()
end complete

final def block: T < IOs =
final def block(timeout: Duration): T < IOs =
def loop(promise: IOPromise[T]): T < IOs =
promise.get() match
case _: Pending[T] @unchecked =>
IOs {
Scheduler.flush()
def now = System.currentTimeMillis()
val deadline =
if timeout.isFinite then now + timeout.toMillis else Long.MaxValue
val b = new (T < IOs => Unit) with (() => T < IOs):
@volatile
private var result: T < IOs = null.asInstanceOf[T < IOs]
Expand All @@ -127,9 +131,13 @@ private[kyo] class IOPromise[T](state: State[T])
result = v
LockSupport.unpark(waiter)
def apply() =
while result == null do
LockSupport.park()
result
while result == null && now <= deadline do
LockSupport.parkNanos(this, (deadline - now) * 1000000)
if result == null && now > deadline then
IOs.fail(Fibers.Interrupted)
else
result
end if
end apply
onComplete(b)
b()
Expand Down
11 changes: 10 additions & 1 deletion kyo-core/shared/src/test/scala/kyoTest/fibersTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,23 @@ class fibersTest extends KyoTest:
}

"timeout" in runJVM {
IOs.attempt(Fibers.runAndBlock(
IOs.attempt(Fibers.runAndBlock(Duration.Inf)(
Fibers.timeout(10.millis)(Fibers.sleep(1.day).andThen(1))
)).map {
case Failure(Fibers.Interrupted) => succeed
case v => fail(v.toString())
}
}

"block timeout" in runJVM {
IOs.attempt(Fibers.runAndBlock(10.millis)(
Fibers.sleep(1.day).andThen(1)
)).map {
case Failure(Fibers.Interrupted) => succeed
case v => fail(v.toString())
}
}

"interrupt" - {

def loop(ref: AtomicInt): Unit < IOs =
Expand Down
12 changes: 6 additions & 6 deletions kyo-core/shared/src/test/scala/kyoTest/metersTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class metersTest extends KyoTest:
t <- Meters.initMutex
p <- Fibers.initPromise[Int]
b1 <- Fibers.initPromise[Unit]
f1 <- Fibers.init(t.run(b1.complete(()).map(_ => p.block)))
f1 <- Fibers.init(t.run(b1.complete(()).map(_ => p.block(Duration.Inf))))
_ <- b1.get
a1 <- t.isAvailable
b2 <- Fibers.initPromise[Unit]
Expand All @@ -39,7 +39,7 @@ class metersTest extends KyoTest:
sem <- Meters.initSemaphore(1)
p <- Fibers.initPromise[Int]
b1 <- Fibers.initPromise[Unit]
f1 <- Fibers.init(sem.tryRun(b1.complete(()).map(_ => p.block)))
f1 <- Fibers.init(sem.tryRun(b1.complete(()).map(_ => p.block(Duration.Inf))))
_ <- b1.get
a1 <- sem.isAvailable
b1 <- sem.tryRun(2)
Expand All @@ -64,10 +64,10 @@ class metersTest extends KyoTest:
t <- Meters.initSemaphore(2)
p <- Fibers.initPromise[Int]
b1 <- Fibers.initPromise[Unit]
f1 <- Fibers.init(t.run(b1.complete(()).map(_ => p.block)))
f1 <- Fibers.init(t.run(b1.complete(()).map(_ => p.block(Duration.Inf))))
_ <- b1.get
b2 <- Fibers.initPromise[Unit]
f2 <- Fibers.init(t.run(b2.complete(()).map(_ => p.block)))
f2 <- Fibers.init(t.run(b2.complete(()).map(_ => p.block(Duration.Inf))))
_ <- b2.get
a1 <- t.isAvailable
b3 <- Fibers.initPromise[Unit]
Expand All @@ -88,10 +88,10 @@ class metersTest extends KyoTest:
sem <- Meters.initSemaphore(2)
p <- Fibers.initPromise[Int]
b1 <- Fibers.initPromise[Unit]
f1 <- Fibers.init(sem.tryRun(b1.complete(()).map(_ => p.block)))
f1 <- Fibers.init(sem.tryRun(b1.complete(()).map(_ => p.block(Duration.Inf))))
_ <- b1.get
b2 <- Fibers.initPromise[Unit]
f2 <- Fibers.init(sem.tryRun(b2.complete(()).map(_ => p.block)))
f2 <- Fibers.init(sem.tryRun(b2.complete(()).map(_ => p.block(Duration.Inf))))
_ <- b2.get
a1 <- sem.isAvailable
b3 <- sem.tryRun(2)
Expand Down

0 comments on commit e812906

Please sign in to comment.