Skip to content

Commit

Permalink
Merge releases/0.8 (#331)
Browse files Browse the repository at this point in the history
  • Loading branch information
svroonland authored Mar 30, 2023
1 parent f12182d commit f6142e2
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ object CircuitBreaker {
for {
currentState <- state.get
result <- currentState match {
case Closed =>
case Closed =>
// The state may have already changed to Open or even HalfOpen.
// This can happen if we fire X calls in parallel where X >= 2 * maxFailures
def onComplete(callSuccessful: Boolean) =
Expand All @@ -201,33 +201,41 @@ object CircuitBreaker {
_ <- changeToOpen.when(currentState == Closed && shouldTrip)
} yield ()).uninterruptible

f.either.flatMap {
case Left(e) if isFailure.applyOrElse[E1, Boolean](e, _ => false) => ZIO.fail(e)
case Left(e) => ZIO.left(WrappedError(e))
case Right(e) => ZIO.right(e)
tapZIOOnUserDefinedFailure(f)(
onFailure = onComplete(callSuccessful = false),
onSuccess = onComplete(callSuccessful = true)
).mapError(WrappedError(_))

}
.tapBoth(_ => onComplete(callSuccessful = false), _ => onComplete(callSuccessful = true))
.mapError(WrappedError(_))
.absolve
case Open =>
ZIO.fail(CircuitBreakerOpen)
case HalfOpen =>
for {
isFirstCall <- halfOpenSwitch.getAndUpdate(_ => false)
result <- if (isFirstCall) {
f.mapError(WrappedError(_))
.tapBoth(
_ => (strategy.shouldTrip(false) *> changeToOpen).uninterruptible,
_ => (changeToClosed *> strategy.onReset).uninterruptible
)
tapZIOOnUserDefinedFailure(f)(
onFailure = (strategy.shouldTrip(false) *> changeToOpen).uninterruptible,
onSuccess = (changeToClosed *> strategy.onReset).uninterruptible
).mapError(WrappedError(_))
} else {
ZIO.fail(CircuitBreakerOpen)
}
} yield result
}
} yield result

private def tapZIOOnUserDefinedFailure[R, E1 <: E, A](
f: ZIO[R, E1, A]
)(onFailure: ZIO[R, E1, Any], onSuccess: ZIO[R, E1, Any]): ZIO[R, E1, A] =
f.tapBoth(
{
case e if isFailure.applyOrElse[E1, Boolean](e, _ => false) =>
onFailure
case _ =>
onSuccess
},
_ => onSuccess
)

def widen[E2](pf: PartialFunction[E2, E]): CircuitBreaker[E2] = CircuitBreakerImpl[ScheduleState, E2](
state,
resetRequests,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package nl.vroste.rezilience

import nl.vroste.rezilience.CircuitBreaker.State
import nl.vroste.rezilience.CircuitBreaker.{ CircuitBreakerOpen, State, WrappedError }
import zio.test.Assertion._
import zio.test.TestAspect.nonFlaky
import zio.test._
import zio.{ durationInt, Queue, Schedule, ZIO }
import zio._

object CircuitBreakerSpec extends ZIOSpecDefault {
sealed trait Error
case object MyCallError extends Error
case object MyNotFatalError extends Error

val isFailure: PartialFunction[Error, Boolean] = {
case MyNotFatalError => false
case _: Error => true
}

// TODO add generator based checks with different nr of parallel calls to check
// for all kinds of race conditions
override def spec = suite("CircuitBreaker")(
Expand All @@ -30,11 +35,6 @@ object CircuitBreakerSpec extends ZIOSpecDefault {
} yield assert(result)(isLeft(equalTo(CircuitBreaker.CircuitBreakerOpen)))
} @@ TestAspect.diagnose(20.seconds),
test("ignore failures that should not be considered a failure") {
val isFailure: PartialFunction[Error, Boolean] = {
case MyNotFatalError => false
case _: Error => true
}

for {
cb <- CircuitBreaker.withMaxFailures(3, Schedule.exponential(1.second), isFailure)
_ <- cb(ZIO.fail(MyNotFatalError)).either.repeatN(3)
Expand Down Expand Up @@ -114,6 +114,39 @@ object CircuitBreakerSpec extends ZIOSpecDefault {
_ <- TestClock.adjust(1.second)
s1 <- stateChanges.take // HalfOpen
} yield assert(s1)(equalTo(State.HalfOpen))
},
test("reset to Closed after Half-Open on success")(
for {
cb <- CircuitBreaker.withMaxFailures(5, Schedule.exponential(2.second), isFailure)
intRef <- Ref.make(0)
error1 <- cb(ZIO.fail(MyNotFatalError)).flip
errors <- ZIO.replicateZIO(5)(cb(ZIO.fail(MyCallError)).flip)
_ <- TestClock.adjust(1.second)
error3 <- cb(intRef.update(_ + 1)).flip // no backend calls here
_ <- TestClock.adjust(1.second)
_ <- cb(intRef.update(_ + 1))
_ <- cb(intRef.update(_ + 1))
nrCalls <- intRef.get
} yield assertTrue(error1.asInstanceOf[WrappedError[Error]].error == MyNotFatalError) &&
assertTrue(errors.forall(_.asInstanceOf[WrappedError[Error]].error == MyCallError)) &&
assertTrue(error3 == CircuitBreakerOpen) &&
assertTrue(nrCalls == 2)
),
test("reset to Closed after Half-Open on error if isFailure=false") {
for {
cb <- CircuitBreaker.withMaxFailures(5, Schedule.exponential(2.second), isFailure)
intRef <- Ref.make(0)
errors <- ZIO.replicateZIO(5)(cb(ZIO.fail(MyCallError)).flip)
_ <- TestClock.adjust(1.second)
error1 <- cb(intRef.update(_ + 1)).flip // no backend calls here
_ <- TestClock.adjust(1.second)
error2 <- cb(ZIO.fail(MyNotFatalError)).flip
_ <- cb(intRef.update(_ + 1))
nrCalls <- intRef.get
} yield assertTrue(errors.forall(_.asInstanceOf[WrappedError[Error]].error == MyCallError)) &&
assertTrue(error1 == CircuitBreakerOpen) &&
assertTrue(error2.asInstanceOf[WrappedError[Error]].error == MyNotFatalError) &&
assertTrue(nrCalls == 1)
}
) @@ nonFlaky
}

0 comments on commit f6142e2

Please sign in to comment.