diff --git a/rezilience/shared/src/main/scala/nl/vroste/rezilience/Bulkhead.scala b/rezilience/shared/src/main/scala/nl/vroste/rezilience/Bulkhead.scala index 82b24b2b..b044364f 100644 --- a/rezilience/shared/src/main/scala/nl/vroste/rezilience/Bulkhead.scala +++ b/rezilience/shared/src/main/scala/nl/vroste/rezilience/Bulkhead.scala @@ -2,7 +2,10 @@ package nl.vroste.rezilience import nl.vroste.rezilience.Bulkhead._ import zio._ +import zio.metrics.MetricKeyType.Histogram.Boundaries +import zio.metrics.{ Metric, MetricKeyType, MetricLabel } import zio.stream.ZStream +import zio.Duration /** * Limits the number of simultaneous in-flight calls to an external resource @@ -13,6 +16,13 @@ import zio.stream.ZStream * * It also prevents queueing up of requests, which consume resources in the calling system, by rejecting calls when the * queue is full. + * + * Bulkhead can record the following metrics: + * - rezilience_bulkhead_calls_in_flight: histogram of number of calls in-flight + * - rezilience_bulkhead_calls_enqueued: histogram of number of calls enqueued + * - rezilience_bulkhead_calls_completed: number of calls that were completed (either succesfully or failed) + * - rezilience_bulkhead_calls_rejected: number of calls rejected because of a full queue + * - rezilience_bulkhead_queue_time: histogram of queueing times (in nanoseconds) */ trait Bulkhead { self => @@ -32,11 +42,6 @@ trait Bulkhead { self => override def apply[R, E1 <: Any, A](f: ZIO[R, E1, A]): ZIO[R, Policy.PolicyError[E1], A] = self(f).mapError(_.fold(Policy.BulkheadRejection, Policy.WrappedError(_))) } - - /** - * Provides the number of in-flight and queued calls - */ - def metrics: UIO[Metrics] } object Bulkhead { @@ -52,9 +57,16 @@ object Bulkhead { final case class WrappedError[E](e: E) extends BulkheadError[E] case object BulkheadRejection extends BulkheadError[Nothing] - final case class Metrics(inFlight: Int, inQueue: Int) + final case class BulkheadException[E](error: BulkheadError[E]) extends Exception("Bulkhead error") - case class BulkheadException[E](error: BulkheadError[E]) extends Exception("Bulkhead error") + // TODO add doc + final case class MetricSettings( + labels: Set[MetricLabel], + sampleInterval: Duration = 1.second, + // TODO find suitable boundaries values + boundariesCalls: Boundaries = MetricKeyType.Histogram.Boundaries.exponential(0, 10, 11), + boundariesQueueTime: Boundaries = MetricKeyType.Histogram.Boundaries.linear(0, 10, 11) + ) private final case class State(enqueued: Int, inFlight: Int) { val total = enqueued + inFlight @@ -69,15 +81,20 @@ object Bulkhead { * Create a Bulkhead with the given parameters * * @param maxInFlightCalls - * Maxmimum of concurrent executing calls + * Maximum of concurrent executing calls * @param maxQueueing * Maximum queueing calls + * @param metricSettings + * Optional settings for recording metrics * @return */ - def make(maxInFlightCalls: Int, maxQueueing: Int = 32): ZIO[Scope, Nothing, Bulkhead] = + def make( + maxInFlightCalls: Int, + maxQueueing: Int = 32, + metricSettings: Option[MetricSettings] = None + ): ZIO[Scope, Nothing, Bulkhead] = for { - queue <- Queue - .bounded[UIO[Unit]](Util.nextPow2(maxQueueing)) + queue <- Queue.bounded[UIO[Unit]](Util.nextPow2(maxQueueing)) inFlightAndQueued <- Ref.make(State(0, 0)) onStart = inFlightAndQueued.update(_.startProcess) onEnd = inFlightAndQueued.update(_.endProcess) @@ -88,8 +105,41 @@ object Bulkhead { } .runDrain .forkScoped - } yield new Bulkhead { - override def apply[R, E, A](task: ZIO[R, E, A]): ZIO[R, BulkheadError[E], A] = + bulkhead = new BulkheadImpl(maxInFlightCalls, maxQueueing, metricSettings, queue, inFlightAndQueued) + _ <- metricSettings.map { metricSettings => + bulkhead.sampleMetrics + .repeat(Schedule.fixed(metricSettings.sampleInterval)) + .forkScoped + }.getOrElse(ZIO.unit) + } yield bulkhead + + final private case class BulkheadMetrics( + callsInFlight: Metric.Histogram[Double], + callsEnqueued: Metric.Histogram[Double], + callsCompleted: Metric.Counter[Long], + callsRejected: Metric.Counter[Long], + queueTime: Metric.Histogram[Double] + ) + + final private class BulkheadImpl( + maxInFlightCalls: Int, + maxQueueing: Int = 32, + metricSettings: Option[MetricSettings], + queue: Queue[UIO[Unit]], + inFlightAndQueued: Ref[State] + ) extends Bulkhead { + private val metrics = metricSettings.map { case MetricSettings(labels, _, boundariesCalls, boundariesQueueTime) => + BulkheadMetrics( + callsInFlight = Metric.histogram("rezilience_bulkhead_calls_in_flight", boundariesCalls).tagged(labels), + callsEnqueued = Metric.histogram("rezilience_bulkhead_calls_enqueued", boundariesCalls).tagged(labels), + callsCompleted = Metric.counter("rezilience_bulkhead_calls_completed").tagged(labels), + callsRejected = Metric.counter("rezilience_bulkhead_calls_rejected").tagged(labels), + queueTime = Metric.histogram("rezilience_bulkhead_queue_time", boundariesQueueTime).tagged(labels) + ) + } + + override def apply[R, E, A](task: ZIO[R, E, A]): ZIO[R, BulkheadError[E], A] = + withRecordQueueTime { recordQueueTime => for { start <- Promise.make[Nothing, Unit] done <- Promise.make[Nothing, Unit] @@ -104,14 +154,47 @@ object Bulkhead { }.flatten.uninterruptible onInterruptOrCompletion = done.succeed(()) - result <- ZIO.scoped[R] { - ZIO - .acquireReleaseInterruptible(enqueueAction.onInterrupt(onInterruptOrCompletion))( - onInterruptOrCompletion - ) *> start.await *> task.mapError(WrappedError(_)) - } + result <- + ZIO + .scoped[R] { + ZIO + .acquireReleaseInterruptible(enqueueAction.onInterrupt(onInterruptOrCompletion))( + onInterruptOrCompletion + ) *> start.await *> recordQueueTime *> task.mapError(WrappedError(_)) + } + .tapBoth( + { + case BulkheadRejection => + ZIO.fromOption(metrics).flatMap(_.callsRejected.increment).ignore + case _ => ZIO.fromOption(metrics).flatMap(_.callsCompleted.increment).ignore + }, + _ => ZIO.fromOption(metrics).flatMap(_.callsCompleted.increment).ignore + ) } yield result + } - override def metrics: UIO[Metrics] = inFlightAndQueued.get.map(state => Metrics(state.inFlight, state.enqueued)) - } + private def withRecordQueueTime[R, E, A](f: UIO[Unit] => ZIO[R, E, A]) = for { + enqueueTime <- ZIO.clockWith(_.instant) + record: UIO[Unit] = for { + startTime <- ZIO.clockWith(_.instant) + queueTime = + Math.max( + 0.0d, + java.time.Duration.between(enqueueTime, startTime).toNanos.toDouble + ) + _ <- ZIO.fromOption(metrics).flatMap(_.queueTime.update(queueTime)).ignore + } yield () + result <- f(record) + } yield result + + def sampleMetrics: UIO[Any] = + ZIO + .fromOption(metrics) + .flatMap { metrics => + inFlightAndQueued.get.map { case State(enqueued, inFlight) => + metrics.callsInFlight.update(inFlight.toDouble) *> metrics.callsEnqueued.update(enqueued.toDouble) + } + } + .ignore + } } diff --git a/rezilience/shared/src/main/scala/nl/vroste/rezilience/CircuitBreaker.scala b/rezilience/shared/src/main/scala/nl/vroste/rezilience/CircuitBreaker.scala index 9b779e2a..a28158a0 100644 --- a/rezilience/shared/src/main/scala/nl/vroste/rezilience/CircuitBreaker.scala +++ b/rezilience/shared/src/main/scala/nl/vroste/rezilience/CircuitBreaker.scala @@ -117,6 +117,16 @@ object CircuitBreaker { case object Open extends State } + /** + * Settings for metrics + * + * @param labels + * Set of labels to annotate metrics with, to distinguish this circuit breaker from others in the same application. + */ + final case class MetricSettings( + labels: Set[MetricLabel] + ) + /** * Create a CircuitBreaker that fails when a number of successive failures (no pun intended) has been counted * @@ -127,9 +137,8 @@ object CircuitBreaker { * @param isFailure * Only failures that match according to `isFailure` are treated as failures by the circuit breaker. Other failures * are passed on, circumventing the circuit breaker's failure counter. - * @param metricLabels - * Set of labels to annotate metrics with, to distinguish this circuit breaker from others in the same application. - * No metrics are recorded if None is passed. + * @param metricSettings + * Settings for recording metrics. No metrics are recorded if None is passed. * @return * The CircuitBreaker as a managed resource */ @@ -137,9 +146,9 @@ object CircuitBreaker { maxFailures: Int, resetPolicy: Schedule[Any, Any, Any] = Retry.Schedules.exponentialBackoff(1.second, 1.minute), isFailure: PartialFunction[E, Boolean] = isFailureAny[E], - metricLabels: Option[Set[MetricLabel]] = None + metricSettings: Option[MetricSettings] = None ): ZIO[Scope, Nothing, CircuitBreaker[E]] = - make(TrippingStrategy.failureCount(maxFailures), resetPolicy, isFailure, metricLabels) + make(TrippingStrategy.failureCount(maxFailures), resetPolicy, isFailure, metricSettings) /** * Create a CircuitBreaker with the given tripping strategy @@ -161,7 +170,7 @@ object CircuitBreaker { resetPolicy: Schedule[Any, Any, Any] = Retry.Schedules.exponentialBackoff(1.second, 1.minute), // TODO should move to its own namespace isFailure: PartialFunction[E, Boolean] = isFailureAny[E], - metricLabels: Option[Set[MetricLabel]] = None + metricSettings: Option[MetricSettings] = None ): ZIO[Scope, Nothing, CircuitBreaker[E]] = for { strategy <- trippingStrategy @@ -178,7 +187,7 @@ object CircuitBreaker { schedule, isFailure, halfOpenSwitch, - metricLabels + metricSettings ) _ <- cb.resetProcess _ <- cb.trackStateChanges @@ -192,12 +201,12 @@ object CircuitBreaker { schedule: Schedule.Driver[ScheduleState, Any, Any, Any], isFailure: PartialFunction[E, Boolean], halfOpenSwitch: Ref[Boolean], - labels: Option[Set[MetricLabel]] + metricSettings: Option[MetricSettings] ) extends CircuitBreaker[E] { override val stateChanges: ZIO[Scope, Nothing, Dequeue[StateChange]] = stateChangesHub.subscribe - val metrics = labels.map { labels => + val metrics = metricSettings.map(_.labels).map { labels => CircuitBreakerMetrics( state = Metric .gauge("rezilience_circuit_breaker_state") @@ -326,7 +335,7 @@ object CircuitBreaker { schedule, pf andThen isFailure, halfOpenSwitch, - labels + metricSettings ) override def currentState: UIO[State] = state.get diff --git a/rezilience/shared/src/main/scala/nl/vroste/rezilience/Policy.scala b/rezilience/shared/src/main/scala/nl/vroste/rezilience/Policy.scala index be7c7802..8238fc32 100644 --- a/rezilience/shared/src/main/scala/nl/vroste/rezilience/Policy.scala +++ b/rezilience/shared/src/main/scala/nl/vroste/rezilience/Policy.scala @@ -1,5 +1,5 @@ package nl.vroste.rezilience -import nl.vroste.rezilience.Bulkhead.{ BulkheadError, Metrics } +import nl.vroste.rezilience.Bulkhead.BulkheadError import nl.vroste.rezilience.CircuitBreaker.CircuitBreakerCallError import nl.vroste.rezilience.Policy.{ flattenWrappedError, PolicyError } import zio.{ Queue, UIO, ZIO } @@ -103,8 +103,6 @@ object Policy { val noopBulkhead: Bulkhead = new Bulkhead { override def apply[R, E, A](task: ZIO[R, E, A]): ZIO[R, BulkheadError[E], A] = task.mapError(Bulkhead.WrappedError(_)) - - override def metrics: UIO[Metrics] = ZIO.succeed(Metrics.apply(0, 0)) } val noopRateLimiter: RateLimiter = new RateLimiter { diff --git a/rezilience/shared/src/test/scala/nl/vroste/rezilience/BulkheadSpec.scala b/rezilience/shared/src/test/scala/nl/vroste/rezilience/BulkheadSpec.scala index 6377ad92..906fef98 100644 --- a/rezilience/shared/src/test/scala/nl/vroste/rezilience/BulkheadSpec.scala +++ b/rezilience/shared/src/test/scala/nl/vroste/rezilience/BulkheadSpec.scala @@ -1,5 +1,7 @@ package nl.vroste.rezilience +import nl.vroste.rezilience.Bulkhead.MetricSettings +import zio.metrics.{ Metric, MetricLabel } import zio.test.Assertion._ import zio.test.TestAspect.{ nonFlaky, timed, timeout } import zio.test._ @@ -109,6 +111,54 @@ object BulkheadSpec extends ZIOSpecDefault { _ <- interrupted.await } yield assertCompletes } - } + }, + suite("Bulkhead with metrics")( + test("has correct metrics after interruption while started") { + for { + labels <- ZIO.randomWith(_.nextUUID).map(uuid => Set(MetricLabel("test_id", uuid.toString))) + bulkhead <- + Bulkhead.make(10, 5, Some(MetricSettings(labels))) + latch <- Promise.make[Nothing, Unit] + interrupted <- Promise.make[Nothing, Unit] + fib <- bulkhead((latch.succeed(()) *> ZIO.never).onInterrupt(interrupted.succeed(()))).fork + _ <- latch.await + _ <- fib.interrupt + _ <- interrupted.await + // TODO it's a histogram, so check that + metricEnqueued <- Metric.counter("rezilience_circuit_breaker_calls_success").tagged(labels).value + } yield assertCompletes + } +// test("emits correct currently in flight metrics") { +// withMetricsCollection { onMetrics => +// Bulkhead +// .make(10, 5) +// .flatMap(Bulkhead.addMetrics(_, onMetrics, metricsInterval = 1.second)) +// .use { bulkhead => +// for { +// latch1 <- Promise.make[Nothing, Unit] +// latch2 <- Promise.make[Nothing, Unit] +// continue <- Promise.make[Nothing, Unit] +// _ <- TestClock.adjust(1.second) +// fib <- bulkhead(latch1.succeed(()) *> continue.await).fork +// _ <- latch1.await +// _ <- TestClock.adjust(1.second) +// fib2 <- bulkhead(latch2.succeed(()) *> continue.await).fork +// _ <- latch2.await +// _ <- TestClock.adjust(1.second) +// _ <- continue.succeed(()) +// _ <- TestClock.adjust(1.second) +// _ <- fib.join +// _ <- fib2.join +// } yield () +// } +// } { (metrics, _) => +// UIO( +// assertTrue(metrics.map(_.currentlyInFlight) == Chunk(0L, 1, 2, 0, 0)) && +// assertTrue(metrics.reduce(_ + _).inFlight.getMaxValue == 2L) +// ) +// } +// } +// ) + ) ) @@ nonFlaky @@ timeout(120.seconds) @@ timed } diff --git a/rezilience/shared/src/test/scala/nl/vroste/rezilience/CircuitBreakerSpec.scala b/rezilience/shared/src/test/scala/nl/vroste/rezilience/CircuitBreakerSpec.scala index 17fb091f..219b3429 100644 --- a/rezilience/shared/src/test/scala/nl/vroste/rezilience/CircuitBreakerSpec.scala +++ b/rezilience/shared/src/test/scala/nl/vroste/rezilience/CircuitBreakerSpec.scala @@ -1,6 +1,6 @@ package nl.vroste.rezilience -import nl.vroste.rezilience.CircuitBreaker.{ CircuitBreakerOpen, State, WrappedError } +import nl.vroste.rezilience.CircuitBreaker.{ CircuitBreakerOpen, MetricSettings, State, WrappedError } import zio._ import zio.metrics.{ Metric, MetricLabel } import zio.stream.ZStream @@ -157,7 +157,7 @@ object CircuitBreakerSpec extends ZIOSpecDefault { for { labels <- ZIO.randomWith(_.nextUUID).map(uuid => Set(MetricLabel("test_id", uuid.toString))) _ <- CircuitBreaker - .withMaxFailures(3, metricLabels = Some(labels)) + .withMaxFailures(3, metricSettings = Some(MetricSettings(labels))) metricState <- Metric.gauge("rezilience_circuit_breaker_calls_state").tagged(labels).value metricStateChanges <- Metric.counter("rezilience_circuit_breaker_calls_state_changes").tagged(labels).value metricSuccess <- Metric.counter("rezilience_circuit_breaker_calls_success").tagged(labels).value @@ -171,7 +171,7 @@ object CircuitBreakerSpec extends ZIOSpecDefault { for { labels <- ZIO.randomWith(_.nextUUID).map(uuid => Set(MetricLabel("test_id", uuid.toString))) cb <- CircuitBreaker - .withMaxFailures(3, metricLabels = Some(labels)) + .withMaxFailures(3, metricSettings = Some(MetricSettings(labels))) _ <- cb(ZIO.unit) _ <- cb(ZIO.fail("Failed")).either metricSuccess <- Metric.counter("rezilience_circuit_breaker_calls_success").tagged(labels).value @@ -182,7 +182,7 @@ object CircuitBreakerSpec extends ZIOSpecDefault { for { labels <- ZIO.randomWith(_.nextUUID).map(uuid => Set(MetricLabel("test_id", uuid.toString))) cb <- CircuitBreaker - .withMaxFailures(10, Schedule.exponential(1.second), metricLabels = Some(labels)) + .withMaxFailures(10, Schedule.exponential(1.second), metricSettings = Some(MetricSettings(labels))) metricStateChanges = Metric.counter("rezilience_circuit_breaker_state_changes").tagged(labels) metricState = Metric.gauge("rezilience_circuit_breaker_state").tagged(labels)