From e8129062344fb8dd96c596ec70261c05d3cea38d Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Mon, 26 Feb 2024 23:30:46 -0800 Subject: [PATCH] fiber.block timeout --- .../src/main/scala/kyo/bench/Bench.scala | 3 ++- .../js/src/main/scala/kyo/fibersSubs.scala | 5 +++-- .../test/scala/kyoTest/MonadLawsTest.scala | 3 ++- .../shared/src/main/scala/kyo/KyoApp.scala | 2 +- .../shared/src/main/scala/kyo/fibers.scala | 21 +++++++++++-------- .../main/scala/kyo/scheduler/IOPromise.scala | 16 ++++++++++---- .../src/test/scala/kyoTest/fibersTest.scala | 11 +++++++++- .../src/test/scala/kyoTest/metersTest.scala | 12 +++++------ 8 files changed, 48 insertions(+), 25 deletions(-) diff --git a/kyo-bench/src/main/scala/kyo/bench/Bench.scala b/kyo-bench/src/main/scala/kyo/bench/Bench.scala index 1b72fc314..c3326be8b 100644 --- a/kyo-bench/src/main/scala/kyo/bench/Bench.scala +++ b/kyo-bench/src/main/scala/kyo/bench/Bench.scala @@ -4,6 +4,7 @@ import cats.effect.IO import cats.effect.unsafe.implicits.global import kyo.* import org.openjdk.jmh.annotations.* +import scala.concurrent.duration.Duration import zio.UIO @State(Scope.Benchmark) @@ -41,7 +42,7 @@ object Bench: abstract class Fork[T](using f: Flat[T]) extends Bench[T]: @Benchmark - def forkKyo(): T = IOs.run(Fibers.init(kyoBenchFiber()).flatMap(_.block)) + def forkKyo(): T = IOs.run(Fibers.init(kyoBenchFiber()).flatMap(_.block(Duration.Inf))) @Benchmark def forkCats(): T = IO.cede.flatMap(_ => catsBench()).unsafeRunSync() diff --git a/kyo-core/js/src/main/scala/kyo/fibersSubs.scala b/kyo-core/js/src/main/scala/kyo/fibersSubs.scala index b53788667..06eb3350c 100644 --- a/kyo-core/js/src/main/scala/kyo/fibersSubs.scala +++ b/kyo-core/js/src/main/scala/kyo/fibersSubs.scala @@ -3,6 +3,7 @@ package java.util.concurrent.locks object LockSupport: private def fail = throw new UnsupportedOperationException("fiber.block is not supported in ScalaJS") - def park(): Unit = fail - def unpark(t: Thread): Unit = fail + def park(): Unit = fail + def parkNanos(o: Object, l: Long) = fail + def unpark(t: Thread): Unit = fail end LockSupport diff --git a/kyo-core/jvm/src/test/scala/kyoTest/MonadLawsTest.scala b/kyo-core/jvm/src/test/scala/kyoTest/MonadLawsTest.scala index 1101bf4df..8be2ca3a4 100644 --- a/kyo-core/jvm/src/test/scala/kyoTest/MonadLawsTest.scala +++ b/kyo-core/jvm/src/test/scala/kyoTest/MonadLawsTest.scala @@ -2,6 +2,7 @@ package kyoTest import kyo.* import kyo.Flat.unsafe.bypass +import scala.concurrent.duration.Duration import zio.Trace import zio.prelude.Equal import zio.prelude.coherent.CovariantDeriveEqual @@ -36,7 +37,7 @@ object MonadLawsTest extends ZIOSpecDefault: override def derive[A: Equal]: Equal[Myo[A]] = new Equal[Myo[A]]: protected def checkEqual(l: Myo[A], r: Myo[A]): Boolean = - def run(m: Myo[A]): A = IOs.run(Fibers.runAndBlock(m.v)) + def run(m: Myo[A]): A = IOs.run(Fibers.runAndBlock(Duration.Inf)(m.v)) run(l) == run(r) def spec = suite("MonadLawsTest")( diff --git a/kyo-core/shared/src/main/scala/kyo/KyoApp.scala b/kyo-core/shared/src/main/scala/kyo/KyoApp.scala index e900742a5..d2124573d 100644 --- a/kyo-core/shared/src/main/scala/kyo/KyoApp.scala +++ b/kyo-core/shared/src/main/scala/kyo/KyoApp.scala @@ -39,7 +39,7 @@ object KyoApp: def run[T](timeout: Duration)(v: T < Effects)( using f: Flat[T < Effects] ): T = - IOs.run(runFiber(timeout)(v).block.map(_.get)) + IOs.run(runFiber(timeout)(v).block(timeout).map(_.get)) def run[T](v: T < Effects)( using f: Flat[T < Effects] diff --git a/kyo-core/shared/src/main/scala/kyo/fibers.scala b/kyo-core/shared/src/main/scala/kyo/fibers.scala index 974ac263c..914f97fb7 100644 --- a/kyo-core/shared/src/main/scala/kyo/fibers.scala +++ b/kyo-core/shared/src/main/scala/kyo/fibers.scala @@ -18,7 +18,7 @@ sealed abstract class Fiber[+T]: def get: T < Fibers def getTry: Try[T] < Fibers def onComplete(f: T < IOs => Unit < IOs): Unit < IOs - def block: T < IOs + def block(timeout: Duration): T < IOs def interrupt: Boolean < IOs def toFuture: Future[T] < IOs def transform[U: Flat](t: T => Fiber[U] < IOs): Fiber[U] < IOs @@ -43,8 +43,8 @@ case class Promise[T: Flat](private[kyo] val p: IOPromise[T]) extends Fiber[T]: def onComplete(f: T < IOs => Unit < IOs) = IOs(p.onComplete(r => IOs.run(f(r)))) - def block = - p.block + def block(timeout: Duration) = + p.block(timeout) def interrupt = IOs(p.interrupt()) @@ -97,14 +97,17 @@ object Fibers extends Joins[Fibers]: def run[T](v: T < Fibers)(using f: Flat[T < Fibers]): Fiber[T] < IOs = FiberGets.run(v) - def runAndBlock[T, S](v: T < (Fibers & S))(implicit + def runAndBlock[T, S](timeout: Duration)(v: T < (Fibers & S))(implicit f: Flat[T < (Fibers & S)] ): T < (IOs & S) = - FiberGets.runAndBlock[T, S](v) + FiberGets.runAndBlock(timeout)(v) def value[T: Flat](v: T): Fiber[T] = Done(v) + def fail[T: Flat](ex: Throwable): Fiber[T] = + Done(IOs.fail(ex)) + def get[T, S](v: Fiber[T] < S): T < (Fibers & S) = v.map(_.get) @@ -253,7 +256,7 @@ object fibersInternal: def get = result def getTry = IOs.attempt(result) def onComplete(f: T < IOs => Unit < IOs) = f(result) - def block = result + def block(timeout: Duration) = result def interrupt = false def toFuture = Future.fromTry(Try(IOs.run(result))) @@ -276,7 +279,7 @@ object fibersInternal: IOs(deepHandle[Fiber, FiberGets, T, IOs](FiberGets)(IOs.runLazy(v))) end run - def runAndBlock[T, S](v: T < (Fibers & S))(implicit + def runAndBlock[T, S](timeout: Duration)(v: T < (Fibers & S))(implicit f: Flat[T < (Fibers & S)] ): T < (IOs & S) = given Handler[Fiber, FiberGets, IOs] = @@ -288,13 +291,13 @@ object fibersInternal: try m match case m: Promise[T] @unchecked => - m.block.map(f) + m.block(timeout).map(f) case Done(v) => v.map(f) catch case ex if (NonFatal(ex)) => handle(ex) - IOs(handle[T, IOs & S, IOs](v).map(_.block)) + IOs(handle[T, IOs & S, IOs](v).map(_.block(timeout))) end runAndBlock end FiberGets val FiberGets = new FiberGets diff --git a/kyo-core/shared/src/main/scala/kyo/scheduler/IOPromise.scala b/kyo-core/shared/src/main/scala/kyo/scheduler/IOPromise.scala index 66a8acf67..08489a53a 100644 --- a/kyo-core/shared/src/main/scala/kyo/scheduler/IOPromise.scala +++ b/kyo-core/shared/src/main/scala/kyo/scheduler/IOPromise.scala @@ -5,6 +5,7 @@ import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.locks.LockSupport import kyo.* import scala.annotation.tailrec +import scala.concurrent.duration.Duration import scala.util.control.NonFatal private[kyo] class IOPromise[T](state: State[T]) @@ -113,12 +114,15 @@ private[kyo] class IOPromise[T](state: State[T]) loop() end complete - final def block: T < IOs = + final def block(timeout: Duration): T < IOs = def loop(promise: IOPromise[T]): T < IOs = promise.get() match case _: Pending[T] @unchecked => IOs { Scheduler.flush() + def now = System.currentTimeMillis() + val deadline = + if timeout.isFinite then now + timeout.toMillis else Long.MaxValue val b = new (T < IOs => Unit) with (() => T < IOs): @volatile private var result: T < IOs = null.asInstanceOf[T < IOs] @@ -127,9 +131,13 @@ private[kyo] class IOPromise[T](state: State[T]) result = v LockSupport.unpark(waiter) def apply() = - while result == null do - LockSupport.park() - result + while result == null && now <= deadline do + LockSupport.parkNanos(this, (deadline - now) * 1000000) + if result == null && now > deadline then + IOs.fail(Fibers.Interrupted) + else + result + end if end apply onComplete(b) b() diff --git a/kyo-core/shared/src/test/scala/kyoTest/fibersTest.scala b/kyo-core/shared/src/test/scala/kyoTest/fibersTest.scala index 969536d53..e7e48f6c6 100644 --- a/kyo-core/shared/src/test/scala/kyoTest/fibersTest.scala +++ b/kyo-core/shared/src/test/scala/kyoTest/fibersTest.scala @@ -77,7 +77,7 @@ class fibersTest extends KyoTest: } "timeout" in runJVM { - IOs.attempt(Fibers.runAndBlock( + IOs.attempt(Fibers.runAndBlock(Duration.Inf)( Fibers.timeout(10.millis)(Fibers.sleep(1.day).andThen(1)) )).map { case Failure(Fibers.Interrupted) => succeed @@ -85,6 +85,15 @@ class fibersTest extends KyoTest: } } + "block timeout" in runJVM { + IOs.attempt(Fibers.runAndBlock(10.millis)( + Fibers.sleep(1.day).andThen(1) + )).map { + case Failure(Fibers.Interrupted) => succeed + case v => fail(v.toString()) + } + } + "interrupt" - { def loop(ref: AtomicInt): Unit < IOs = diff --git a/kyo-core/shared/src/test/scala/kyoTest/metersTest.scala b/kyo-core/shared/src/test/scala/kyoTest/metersTest.scala index 6da529852..fc19f9fff 100644 --- a/kyo-core/shared/src/test/scala/kyoTest/metersTest.scala +++ b/kyo-core/shared/src/test/scala/kyoTest/metersTest.scala @@ -18,7 +18,7 @@ class metersTest extends KyoTest: t <- Meters.initMutex p <- Fibers.initPromise[Int] b1 <- Fibers.initPromise[Unit] - f1 <- Fibers.init(t.run(b1.complete(()).map(_ => p.block))) + f1 <- Fibers.init(t.run(b1.complete(()).map(_ => p.block(Duration.Inf)))) _ <- b1.get a1 <- t.isAvailable b2 <- Fibers.initPromise[Unit] @@ -39,7 +39,7 @@ class metersTest extends KyoTest: sem <- Meters.initSemaphore(1) p <- Fibers.initPromise[Int] b1 <- Fibers.initPromise[Unit] - f1 <- Fibers.init(sem.tryRun(b1.complete(()).map(_ => p.block))) + f1 <- Fibers.init(sem.tryRun(b1.complete(()).map(_ => p.block(Duration.Inf)))) _ <- b1.get a1 <- sem.isAvailable b1 <- sem.tryRun(2) @@ -64,10 +64,10 @@ class metersTest extends KyoTest: t <- Meters.initSemaphore(2) p <- Fibers.initPromise[Int] b1 <- Fibers.initPromise[Unit] - f1 <- Fibers.init(t.run(b1.complete(()).map(_ => p.block))) + f1 <- Fibers.init(t.run(b1.complete(()).map(_ => p.block(Duration.Inf)))) _ <- b1.get b2 <- Fibers.initPromise[Unit] - f2 <- Fibers.init(t.run(b2.complete(()).map(_ => p.block))) + f2 <- Fibers.init(t.run(b2.complete(()).map(_ => p.block(Duration.Inf)))) _ <- b2.get a1 <- t.isAvailable b3 <- Fibers.initPromise[Unit] @@ -88,10 +88,10 @@ class metersTest extends KyoTest: sem <- Meters.initSemaphore(2) p <- Fibers.initPromise[Int] b1 <- Fibers.initPromise[Unit] - f1 <- Fibers.init(sem.tryRun(b1.complete(()).map(_ => p.block))) + f1 <- Fibers.init(sem.tryRun(b1.complete(()).map(_ => p.block(Duration.Inf)))) _ <- b1.get b2 <- Fibers.initPromise[Unit] - f2 <- Fibers.init(sem.tryRun(b2.complete(()).map(_ => p.block))) + f2 <- Fibers.init(sem.tryRun(b2.complete(()).map(_ => p.block(Duration.Inf)))) _ <- b2.get a1 <- sem.isAvailable b3 <- sem.tryRun(2)