diff --git a/build.sbt b/build.sbt index e7fe222c..237d80e8 100644 --- a/build.sbt +++ b/build.sbt @@ -40,13 +40,7 @@ lazy val root = project lazy val rezilience = crossProject(JSPlatform, JVMPlatform) .in(file("rezilience")) - .jvmSettings( - commonJvmSettings ++ Seq( - libraryDependencies ++= Seq( - "org.hdrhistogram" % "HdrHistogram" % "2.1.12" - ) - ) - ) + .jvmSettings(commonJvmSettings) .jsSettings(commonJsSettings) .settings( name := "rezilience", diff --git a/docs/docs/docs/metrics.md b/docs/docs/docs/metrics.md deleted file mode 100644 index 7e7991a1..00000000 --- a/docs/docs/docs/metrics.md +++ /dev/null @@ -1,25 +0,0 @@ ---- -layout: docs -title: Metrics -permalink: docs/metrics/ ---- - -# Metrics - -On the JVM, `rezilience` policies offer metrics for monitoring and alerting purposes, for example when a queue is increasing. - -The policies have a method `makeWithMetrics` that allows specifying a 'callback' effect that is executed periodically and provided with a Metrics object. - -Some metric values are recorded as distributions via [HDR Histogram](https://hdrhistogram.github.io/HdrHistogram/), for accurate statistics over a wide range of values. Accessors are provided for mean values. - -Two or more metric objects can be combined using the `+` operator to get metrics for the combined interval, with correctly summed histograms. - -Finally, during the release of a policy's `ZManaged`, metrics for the final interval are emitted to ensure an accurate total. - -## RateLimiter - -| Name | Type | Description | -|----------------|-----------------------------------------------------|-------------------------------------| -| latency | Histogram | Time between when a task is enqueued and when it is started | -| tasksEnqueued | Int | Total number of tasks enqueued in this interval | -| currentlyEnqueued | Int | Number of tasks currently waiting to be started \ No newline at end of file diff --git a/rezilience/js/src/main/scala/nl/vroste/rezilience/RateLimiterPlatformSpecific.scala b/rezilience/js/src/main/scala/nl/vroste/rezilience/RateLimiterPlatformSpecific.scala deleted file mode 100644 index be35cc70..00000000 --- a/rezilience/js/src/main/scala/nl/vroste/rezilience/RateLimiterPlatformSpecific.scala +++ /dev/null @@ -1,3 +0,0 @@ -package nl.vroste.rezilience - -trait RateLimiterPlatformSpecificObj {} diff --git a/rezilience/jvm/src/main/scala/nl/vroste/rezilience/HistogramSettings.scala b/rezilience/jvm/src/main/scala/nl/vroste/rezilience/HistogramSettings.scala deleted file mode 100644 index ba305848..00000000 --- a/rezilience/jvm/src/main/scala/nl/vroste/rezilience/HistogramSettings.scala +++ /dev/null @@ -1,5 +0,0 @@ -package nl.vroste.rezilience - -import zio.duration.Duration - -case class HistogramSettings(min: Duration, max: Duration, significantDigits: Int = 2) diff --git a/rezilience/jvm/src/main/scala/nl/vroste/rezilience/HistogramUtil.scala b/rezilience/jvm/src/main/scala/nl/vroste/rezilience/HistogramUtil.scala deleted file mode 100644 index 7a30f14a..00000000 --- a/rezilience/jvm/src/main/scala/nl/vroste/rezilience/HistogramUtil.scala +++ /dev/null @@ -1,17 +0,0 @@ -package nl.vroste.rezilience - -import org.HdrHistogram.AbstractHistogram - -object HistogramUtil { - def mergeHistograms[T <: AbstractHistogram](h1: T, h2: T): T = { - val newHist = h1.copy() - newHist.add(h2) - newHist.asInstanceOf[T] - } - - def addToHistogram[T <: AbstractHistogram](hist: T, values: Seq[Long]): T = { - val newHist = hist.copy().asInstanceOf[T] - values.foreach(newHist.recordValue) - newHist - } -} diff --git a/rezilience/jvm/src/main/scala/nl/vroste/rezilience/RateLimiterPlatformSpecific.scala b/rezilience/jvm/src/main/scala/nl/vroste/rezilience/RateLimiterPlatformSpecific.scala deleted file mode 100644 index e10263ab..00000000 --- a/rezilience/jvm/src/main/scala/nl/vroste/rezilience/RateLimiterPlatformSpecific.scala +++ /dev/null @@ -1,160 +0,0 @@ -package nl.vroste.rezilience - -import nl.vroste.rezilience.RateLimiterPlatformSpecificObj.RateLimiterMetricsInternal -import org.HdrHistogram.{ AbstractHistogram, IntCountsHistogram } -import zio.{ clock, Ref, Schedule, UIO, ZIO, ZManaged } -import zio.clock.Clock -import zio.duration.{ durationInt, Duration } - -import java.time.Instant - -final case class RateLimiterMetrics( - /* - * Interval in which these metrics were collected - */ - interval: Duration, - /* - * Times that tasks were queued by the RateLimiter before starting execution - */ - latency: AbstractHistogram, - /* - * Number of tasks that were enqueued in this metrics interval - */ - tasksEnqueued: Long, - /* - * Number of tasks that are currently enqueued - */ - currentlyEnqueued: Long -) { - import HistogramUtil._ - val tasksStarted = latency.getTotalCount - - override def toString: String = - Seq( - ("interval", interval.getSeconds, "s"), - ("tasks enqueued in interval", tasksEnqueued, ""), - ("tasks currently enqueued", currentlyEnqueued, ""), - ("tasks started", tasksStarted, ""), - ("mean latency", latency.getMean.toInt, "ms"), - ("95% latency", latency.getValueAtPercentile(95).toInt, "ms"), - ("min latency", latency.getMinValue.toInt, "ms"), - ("mean latency", latency.getMean.toInt, "ms") - ).map { case (name, value, unit) => s"${name}=${value}${if (unit.isEmpty) "" else " " + unit}" }.mkString(", ") - - def +(that: RateLimiterMetrics): RateLimiterMetrics = copy( - interval = interval plus that.interval, - latency = mergeHistograms(latency, that.latency), - tasksEnqueued = tasksEnqueued + that.tasksEnqueued - ) -} - -object RateLimiterMetrics { - private val emptyLatency = new IntCountsHistogram(1, 6000000, 2) - - val empty = RateLimiterMetrics(0.seconds, emptyLatency, 0, 0) - -} - -trait RateLimiterPlatformSpecificObj { - - /** - * Create a RateLimiter with metrics - * - * Metrics are emitted at a regular interval. When the RateLimiter is released, metrics for the - * final interval are emitted. - * - * @param max - * @param interval - * @param onMetrics - * @param metricsInterval - * @param latencyHistogramSettings - * @return - */ - def makeWithMetrics( - max: Int, - interval: Duration = 1.second, - onMetrics: RateLimiterMetrics => UIO[Any], - metricsInterval: Duration = 10.seconds, - latencyHistogramSettings: HistogramSettings = HistogramSettings(1.milli, 2.minutes) - ): ZManaged[Clock, Nothing, RateLimiter] = { - - def collectMetrics(currentMetrics: Ref[RateLimiterMetricsInternal]) = - for { - now <- clock.instant - newMetrics = RateLimiterMetricsInternal.empty(now, latencyHistogramSettings) - lastMetrics <- - currentMetrics.getAndUpdate(metrics => newMetrics.copy(currentlyEnqueued = metrics.currentlyEnqueued)) - interval = java.time.Duration.between(lastMetrics.start, now) - _ <- onMetrics(lastMetrics.toUserMetrics(interval)) - } yield () - - def runCollectMetricsLoop(metrics: Ref[RateLimiterMetricsInternal]) = - collectMetrics(metrics) - .repeat(Schedule.fixed(metricsInterval)) - .delay(metricsInterval) - .forkManaged - .ensuring(collectMetrics(metrics)) - - for { - inner <- RateLimiter.make(max, interval) - now <- clock.instant.toManaged_ - metrics <- Ref.make(RateLimiterMetricsInternal.empty(now, latencyHistogramSettings)).toManaged_ - _ <- runCollectMetricsLoop(metrics) - env <- ZManaged.environment[Clock] - } yield new RateLimiter { - override def apply[R, E, A](task: ZIO[R, E, A]): ZIO[R, E, A] = for { - enqueueTime <- clock.instant.provide(env) - // Keep track of whether the task was started to have correct statistics under interruption - started <- Ref.make(false) - result <- metrics - .update(_.enqueueTask) - .toManaged(_ => metrics.update(_.taskInterrupted).unlessM(started.get)) - .use_ { - inner.apply { - for { - startTime <- clock.instant.provide(env) - latency = java.time.Duration.between(enqueueTime, startTime) - _ <- metrics.update(_.taskStarted(latency)).ensuring(started.set(true)) - result <- task - } yield result - } - } - } yield result - } - } -} - -private[rezilience] object RateLimiterPlatformSpecificObj extends RateLimiterPlatformSpecificObj { - final case class RateLimiterMetricsInternal( - start: Instant, - latency: AbstractHistogram, - tasksEnqueued: Long, - currentlyEnqueued: Long - ) { - import HistogramUtil._ - - def toUserMetrics(interval: Duration): RateLimiterMetrics = - RateLimiterMetrics(interval, latency, tasksEnqueued, currentlyEnqueued) - - def taskStarted(latencySample: Duration): RateLimiterMetricsInternal = copy( - latency = addToHistogram(latency, Seq(Math.max(0, latencySample.toMillis))), - currentlyEnqueued = currentlyEnqueued - 1 - ) - - def taskInterrupted = copy(currentlyEnqueued = currentlyEnqueued - 1) - - def enqueueTask: RateLimiterMetricsInternal = - copy(tasksEnqueued = tasksEnqueued + 1, currentlyEnqueued = currentlyEnqueued + 1) - } - - object RateLimiterMetricsInternal { - def empty(now: Instant, settings: HistogramSettings) = - RateLimiterMetricsInternal( - start = now, - latency = new IntCountsHistogram(settings.min.toMillis, settings.max.toMillis, settings.significantDigits), - tasksEnqueued = 0, - currentlyEnqueued = 0 - ) - } - -} diff --git a/rezilience/jvm/src/test/scala/nl/vroste/rezilience/RateLimiterMetricsSpec.scala b/rezilience/jvm/src/test/scala/nl/vroste/rezilience/RateLimiterMetricsSpec.scala deleted file mode 100644 index 89aab36b..00000000 --- a/rezilience/jvm/src/test/scala/nl/vroste/rezilience/RateLimiterMetricsSpec.scala +++ /dev/null @@ -1,99 +0,0 @@ -package nl.vroste.rezilience - -import zio.test._ -import zio.duration._ -import zio.{ clock, Promise, Ref, Schedule, UIO, ZIO } -import zio.clock.Clock -import zio.random.Random -import zio.test.Assertion._ -import zio.test.environment.TestClock - -object RateLimiterMetricsSpec extends DefaultRunnableSpec { - override def spec = suite("RateLimiter")( - suite("preserves RateLimiter behavior")( - testM("will interrupt the effect when a call is interrupted") { - RateLimiterPlatformSpecificObj.makeWithMetrics(10, 1.second, _ => UIO.unit).use { rl => - for { - latch <- Promise.make[Nothing, Unit] - interrupted <- Promise.make[Nothing, Unit] - fib <- rl((latch.succeed(()) *> ZIO.never).onInterrupt(interrupted.succeed(()))).fork - _ <- latch.await - _ <- fib.interrupt - _ <- interrupted.await - } yield assertCompletes - } - } - ), - suite("metrics")( - testM("emits metrics after use") { - for { - metricsRef <- Promise.make[Nothing, RateLimiterMetrics] - _ <- RateLimiterPlatformSpecificObj - .makeWithMetrics(10, 1.second, onMetrics = metricsRef.succeed, metricsInterval = 5.second) - .use { rl => - rl(UIO.unit) - } - metrics <- metricsRef.await - - } yield assert(metrics)(hasField("tasksStarted", _.tasksStarted, equalTo(1L))) && - assert(metrics)(hasField("tasksEnqueued", _.tasksEnqueued, equalTo(1L))) - }, - testM("emits metrics at the interval") { - for { - metricsRef <- Ref.make(Vector.empty[RateLimiterMetrics]) - _ <- RateLimiterPlatformSpecificObj - .makeWithMetrics( - 10, - 1.second, - onMetrics = m => metricsRef.update(_ :+ m), - metricsInterval = 1.second - ) - .use { rl => - for { - _ <- rl(UIO.unit).fork.repeatN(100) - _ <- TestClock.adjust(1.second) - _ <- TestClock.adjust(1.second) - _ <- TestClock.adjust(500.millis) - } yield () - } - metrics <- metricsRef.get - } yield assert(metrics)(hasSize(equalTo(3))) - }, - testM("can sum metrics") { - for { - metricsRef <- Ref.make(RateLimiterMetrics.empty) - _ <- RateLimiterPlatformSpecificObj - .makeWithMetrics( - 10, - 1.second, - onMetrics = m => metricsRef.update(_ + m), - metricsInterval = 1.second - ) - .use { rl => - for { - _ <- rl(UIO.unit).fork.repeatN(100) - _ <- TestClock.adjust(1.second) - _ <- TestClock.adjust(1.second) - _ <- TestClock.adjust(500.millis) - } yield () - } - metrics <- metricsRef.get - } yield assert(metrics)(hasField("interval", _.interval, equalTo(2500.millis))) - } - ), - suite("metrics live")( - testM("emits metrics") { - RateLimiterPlatformSpecificObj - .makeWithMetrics(10, 1.second, onMetrics = metrics => UIO(println(metrics)), metricsInterval = 5.second) - .use { rl => - for { - _ <- rl(clock.instant.flatMap(now => UIO(println(now)))).fork - .repeat(Schedule.fixed(100.millis)) - .fork - _ <- ZIO.sleep(3.seconds) - } yield assertCompletes - } - } - ).provideCustomLayer(Clock.live ++ Random.live) - ) -} diff --git a/rezilience/shared/src/main/scala/nl/vroste/rezilience/RateLimiter.scala b/rezilience/shared/src/main/scala/nl/vroste/rezilience/RateLimiter.scala index 44c9ca1f..e9e54a2a 100644 --- a/rezilience/shared/src/main/scala/nl/vroste/rezilience/RateLimiter.scala +++ b/rezilience/shared/src/main/scala/nl/vroste/rezilience/RateLimiter.scala @@ -34,7 +34,7 @@ trait RateLimiter { self => } } -object RateLimiter extends RateLimiterPlatformSpecificObj { +object RateLimiter { /** * Creates a RateLimiter as Managed resource @@ -53,7 +53,7 @@ object RateLimiter extends RateLimiterPlatformSpecificObj { .bounded[(Ref[Boolean], UIO[Any])](zio.internal.RingBuffer.nextPow2(max)) .toManaged_ // Power of two because it is a more efficient queue implementation _ <- ZStream - .fromQueue(q, 1) // Until https://github.com/zio/zio/issues/4190 is fixed + .fromQueue(q, maxChunkSize = max) .filterM { case (interrupted, effect @ _) => interrupted.get.map(!_) } .throttleShape(max.toLong, interval, max.toLong)(_.size.toLong) .mapMParUnordered(Int.MaxValue) { case (interrupted @ _, effect) => effect }