Skip to content

Commit

Permalink
[core] fix IOPromise exception handling (#933)
Browse files Browse the repository at this point in the history
I noticed an odd scenario in a project I'm working on. The `IOPromise`
flushing was throwing an exception due to a buggy `onInterrupt` function
in the project. This PR ensures exception handling is present in all
callback function evaluations.
  • Loading branch information
fwbrasil authored Dec 16, 2024
1 parent d093cee commit 3febb53
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 20 deletions.
35 changes: 15 additions & 20 deletions kyo-core/shared/src/main/scala/kyo/scheduler/IOPromise.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,7 @@ private[kyo] class IOPromise[+E, +A](init: State[E, A]) extends Safepoint.Interc
case l: Linked[E, A] @unchecked =>
interruptsLoop(l.p)
case _ =>
try discard(other.interrupt(Result.Panic(Interrupt(frame))))
catch
case ex if NonFatal(ex) =>
import AllowUnsafe.embrace.danger
Log.live.unsafe.error("uncaught exception", ex)
discard(other.interrupt(Result.Panic(Interrupt(frame))))
interruptsLoop(this)
end interrupts

Expand Down Expand Up @@ -128,12 +124,7 @@ private[kyo] class IOPromise[+E, +A](init: State[E, A]) extends Safepoint.Interc
case l: Linked[E, A] @unchecked =>
onCompleteLoop(l.p)
case v =>
try f(v.asInstanceOf[Result[E, A]])
catch
case ex if NonFatal(ex) =>
given Frame = Frame.internal
import AllowUnsafe.embrace.danger
Log.live.unsafe.error("uncaught exception", ex)
IOPromise.eval(f, v.asInstanceOf[Result[E, A]])
onCompleteLoop(this)
end onComplete

Expand Down Expand Up @@ -246,16 +237,10 @@ private[kyo] object IOPromise:
new Pending[E, A]:
def waiters: Int = self.waiters + 1
def interrupt[E2 >: E](error: Error[E2]) =
f(error.asInstanceOf[Error[E]])
eval(f, error.asInstanceOf[Error[E]])
self
def run[E2 >: E, A2 >: A](v: Result[E2, A2]) =
try f(v.asInstanceOf[Result[E, A]])
catch
case ex if NonFatal(ex) =>
given Frame = Frame.internal
import AllowUnsafe.embrace.danger
Log.live.unsafe.error("uncaught exception", ex)
end try
eval(f, v.asInstanceOf[Result[E, A]])
self
end run

Expand All @@ -272,7 +257,7 @@ private[kyo] object IOPromise:
inline def onInterrupt(inline f: Error[E] => Unit): Pending[E, A] =
new Pending[E, A]:
def interrupt[E2 >: E](error: Error[E2]) =
f(error.asInstanceOf[Error[E]])
eval(f, error.asInstanceOf[Error[E]])
self
def waiters: Int = self.waiters + 1
def run[E2 >: E, A2 >: A](v: Result[E2, A2]) =
Expand Down Expand Up @@ -325,4 +310,14 @@ private[kyo] object IOPromise:
def run[E2, A2](v: Result[E2, A2]) = this
end Empty
end Pending

private inline def eval[A, B](inline f: A => Unit, value: A): Unit =
try f(value)
catch
case ex if NonFatal(ex) =>
given Frame = Frame.internal
import AllowUnsafe.embrace.danger
Log.live.unsafe.error("uncaught exception", ex)
end try
end eval
end IOPromise
80 changes: 80 additions & 0 deletions kyo-core/shared/src/test/scala/kyo/scheduler/IOPromiseTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -758,4 +758,84 @@ class IOPromiseTest extends Test:
}
}

"exception handling" - {
val ex = new RuntimeException("test exception")

"multiple callbacks with exceptions" in {
val p = new IOPromise[Nothing, Int]()
var firstCallbackExecuted = false
var lastCallbackExecuted = false

p.onComplete(_ => firstCallbackExecuted = true)
p.onComplete(_ => throw ex)
p.onComplete(_ => throw ex)
p.onComplete(_ => lastCallbackExecuted = true)

p.complete(Result.success(42))
assert(firstCallbackExecuted)
assert(lastCallbackExecuted)
}

"exceptions in onInterrupt callbacks" in {
val p = new IOPromise[Nothing, Int]()
var firstCallbackExecuted = false
var lastCallbackExecuted = false

p.onComplete(_ => firstCallbackExecuted = true)
p.onInterrupt(_ => throw ex)
p.onInterrupt(_ => lastCallbackExecuted = true)

p.interrupt(Result.Panic(new Exception("Test interrupt")))
assert(firstCallbackExecuted)
assert(lastCallbackExecuted)
}

"nested callbacks with exceptions" in {
val p1 = new IOPromise[Nothing, Int]()
val p2 = new IOPromise[Nothing, Int]()
var innerCallbackExecuted = false

p1.onComplete { _ =>
throw ex
discard(p2.complete(Result.success(42)))
}

p2.onComplete(_ => innerCallbackExecuted = true)

p1.complete(Result.success(1))
assert(!innerCallbackExecuted)

p2.complete(Result.success(42))
assert(innerCallbackExecuted)
}

"exceptions during promise chaining" in {
val p1 = new IOPromise[Nothing, Int]()
val p2 = new IOPromise[Nothing, Int]()
var chainCompleted = false

p1.onComplete { _ =>
p2.onComplete(_ => throw new RuntimeException("test exception"))
p2.onComplete(_ => chainCompleted = true)
}

p1.complete(Result.success(1))
p2.complete(Result.success(2))

assert(chainCompleted)
}

"exceptions with masked promises" in {
val original = new IOPromise[Nothing, Int]()
val masked = original.mask()
var maskedCallbackExecuted = false

masked.onComplete(_ => throw ex)
masked.onComplete(_ => maskedCallbackExecuted = true)

original.complete(Result.success(42))
assert(maskedCallbackExecuted)
}
}

end IOPromiseTest

0 comments on commit 3febb53

Please sign in to comment.