Skip to content

Commit

Permalink
Merge pull request #567 from gemini-hlsw/pr/deglitcher
Browse files Browse the repository at this point in the history
`Deglitcher`
  • Loading branch information
armanbilge authored Nov 23, 2023
2 parents 5c01444 + fcade64 commit 3942653
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 0 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ lazy val tests = crossProject(JVMPlatform, JSPlatform)
Settings.Libraries.Discipline.value ++
Settings.Libraries.DisciplineMUnit.value ++
Settings.Libraries.CatsLaws.value ++
Settings.Libraries.CatsEffectTestkit.value ++
Settings.Libraries.MonocleMacro.value ++
Settings.Libraries.MonocleLaw.value).map(_ % Test)
)
Expand Down
42 changes: 42 additions & 0 deletions modules/core/shared/src/main/scala/crystal/Deglitcher.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) 2016-2023 Association of Universities for Research in Astronomy, Inc. (AURA)
// For license information see LICENSE or https://opensource.org/licenses/BSD-3-Clause

package crystal

import cats.effect.Ref
import cats.effect.Temporal
import cats.syntax.all._
import fs2.Pipe
import fs2.Stream

import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration

final class Deglitcher[F[_]] private (
waitUntil: Ref[F, FiniteDuration],
timeout: FiniteDuration
)(using F: Temporal[F]) {

def throttle: F[Unit] =
F.realTime.flatMap(now => waitUntil.set(now + timeout))

def debounce[A]: Pipe[F, A, A] =
_.switchMap { a =>
Stream.eval {
def wait: F[Unit] =
(waitUntil.get, F.realTime).flatMapN { (waitUntil, now) =>
if (waitUntil > now)
F.sleep(waitUntil - now) *> wait
else F.unit
}

wait.as(a)
}
}

}

object Deglitcher {
def apply[F[_]](timeout: FiniteDuration)(using F: Temporal[F]): F[Deglitcher[F]] =
F.ref(Duration.Zero).map(new Deglitcher(_, timeout))
}
78 changes: 78 additions & 0 deletions modules/tests/shared/src/test/scala/crystal/DeglitcherSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (c) 2016-2023 Association of Universities for Research in Astronomy, Inc. (AURA)
// For license information see LICENSE or https://opensource.org/licenses/BSD-3-Clause

package crystal

import cats.effect.IO
import cats.effect.testkit.TestControl
import cats.syntax.all.*
import fs2.Stream
import munit.CatsEffectSuite

import scala.concurrent.duration.*

class DeglitcherSuite extends CatsEffectSuite {

def server[A](values: (A, FiniteDuration)*): Stream[IO, A] =
Stream
.emits(values)
.evalMap { (a, t) =>
IO.sleep(t).as(a)
}
.prefetchN(Int.MaxValue)

test("emits elements immediately if unthrottled") {
TestControl
.executeEmbed {
Deglitcher[IO](100.millis).flatMap { deglitcher =>
server(0 -> 1.millis, 1 -> 1.millis, 2 -> 1.millis)
.through(deglitcher.debounce)
.evalMap(IO.realTime.tupleLeft(_))
.compile
.toList
}
}
.assertEquals(List(0 -> 1.millis, 1 -> 2.millis, 2 -> 3.millis))
}

test("respects throttling") {
TestControl
.executeEmbed {
Deglitcher[IO](100.millis).flatMap { deglitcher =>
server(0 -> 0.millis, 1 -> 10.millis, 2 -> 20.millis, 3 -> 200.millis)
.through(deglitcher.debounce)
.evalMap(IO.realTime.tupleLeft(_))
.compile
.toList
.background
.use { result =>
IO.sleep(1.millis) *> deglitcher.throttle *> result.flatMap(_.embedError)
}
}
}
.assertEquals(List(0 -> 0.millis, 2 -> 101.millis, 3 -> 230.millis))
}

test("respects multiple throttles") {
TestControl
.executeEmbed {
Deglitcher[IO](100.millis).flatMap { deglitcher =>
server(0 -> 20.millis)
.through(deglitcher.debounce)
.evalMap(IO.realTime.tupleLeft(_))
.compile
.toList
.background
.use { result =>
IO.sleep(10.millis) *>
deglitcher.throttle *>
IO.sleep(40.millis) *>
deglitcher.throttle *>
result.flatMap(_.embedError)
}
}
}
.assertEquals(List(0 -> 150.millis))
}

}
6 changes: 6 additions & 0 deletions project/Settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ object Settings {
)
)

val CatsEffectTestkit = Def.setting(
Seq[ModuleID](
"org.typelevel" %%% "cats-effect-testkit" % catsEffect
)
)

val CatsLaws = Def.setting(
Seq[ModuleID](
"org.typelevel" %%% "cats-laws" % cats
Expand Down

0 comments on commit 3942653

Please sign in to comment.