From 9b5d9bda90d7f071d50f963fc60bf89c4f7a4680 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 22 Nov 2023 21:34:37 +0000 Subject: [PATCH 1/3] `Deglitcher` --- build.sbt | 1 + .../src/main/scala/crystal/Deglitcher.scala | 42 ++++++++++++++ .../test/scala/crystal/DeglitcherSuite.scala | 56 +++++++++++++++++++ project/Settings.scala | 6 ++ 4 files changed, 105 insertions(+) create mode 100644 modules/core/shared/src/main/scala/crystal/Deglitcher.scala create mode 100644 modules/tests/shared/src/test/scala/crystal/DeglitcherSuite.scala diff --git a/build.sbt b/build.sbt index de4789ad..176ac377 100644 --- a/build.sbt +++ b/build.sbt @@ -45,6 +45,7 @@ lazy val tests = crossProject(JVMPlatform, JSPlatform) Settings.Libraries.Discipline.value ++ Settings.Libraries.DisciplineMUnit.value ++ Settings.Libraries.CatsLaws.value ++ + Settings.Libraries.CatsEffectTestkit.value ++ Settings.Libraries.MonocleMacro.value ++ Settings.Libraries.MonocleLaw.value).map(_ % Test) ) diff --git a/modules/core/shared/src/main/scala/crystal/Deglitcher.scala b/modules/core/shared/src/main/scala/crystal/Deglitcher.scala new file mode 100644 index 00000000..8fefe34d --- /dev/null +++ b/modules/core/shared/src/main/scala/crystal/Deglitcher.scala @@ -0,0 +1,42 @@ +// Copyright (c) 2016-2023 Association of Universities for Research in Astronomy, Inc. (AURA) +// For license information see LICENSE or https://opensource.org/licenses/BSD-3-Clause + +package crystal + +import cats.effect.Ref +import cats.effect.Temporal +import cats.syntax.all._ +import fs2.Stream +import fs2.Pipe + +import scala.concurrent.duration.Duration +import scala.concurrent.duration.FiniteDuration + +final class Deglitcher[F[_]] private ( + waitUntil: Ref[F, FiniteDuration], + timeout: FiniteDuration +)(using F: Temporal[F]) { + + def throttle: F[Unit] = + F.realTime.flatMap(now => waitUntil.set(now + timeout)) + + def debounce[A]: Pipe[F, A, A] = + _.switchMap { a => + Stream.eval { + def wait: F[Unit] = + (waitUntil.get, F.realTime).flatMapN { (waitUntil, now) => + if (waitUntil > now) + F.sleep(waitUntil - now) *> wait + else F.unit + } + + wait.as(a) + } + } + +} + +object Deglitcher { + def apply[F[_]](timeout: FiniteDuration)(using F: Temporal[F]): F[Deglitcher[F]] = + F.ref(Duration.Zero).map(new Deglitcher(_, timeout)) +} diff --git a/modules/tests/shared/src/test/scala/crystal/DeglitcherSuite.scala b/modules/tests/shared/src/test/scala/crystal/DeglitcherSuite.scala new file mode 100644 index 00000000..e6c15170 --- /dev/null +++ b/modules/tests/shared/src/test/scala/crystal/DeglitcherSuite.scala @@ -0,0 +1,56 @@ +// Copyright (c) 2016-2023 Association of Universities for Research in Astronomy, Inc. (AURA) +// For license information see LICENSE or https://opensource.org/licenses/BSD-3-Clause + +package crystal + +import cats.effect.IO +import cats.effect.testkit.TestControl +import cats.syntax.all.* +import fs2.Stream +import munit.CatsEffectSuite + +import scala.concurrent.duration.* + +class DeglitcherSuite extends CatsEffectSuite { + + def server[A](values: (A, FiniteDuration)*): Stream[IO, A] = + Stream + .emits(values) + .evalMap { (a, t) => + IO.sleep(t).as(a) + } + .prefetchN(Int.MaxValue) + + test("emits elements immediately if unthrottled") { + TestControl + .executeEmbed { + Deglitcher[IO](100.millis).flatMap { deglitcher => + server(() -> 1.millis, () -> 1.millis, () -> 1.millis) + .through(deglitcher.debounce) + .evalMap(_ => IO.realTime) + .compile + .toList + } + } + .assertEquals(List(1.millis, 2.millis, 3.millis)) + } + + test("respects throttling") { + TestControl + .executeEmbed { + Deglitcher[IO](100.millis).flatMap { deglitcher => + server(0 -> 0.millis, 1 -> 10.millis, 2 -> 20.millis, 3 -> 200.millis) + .through(deglitcher.debounce) + .evalMap(IO.realTime.tupleLeft(_)) + .compile + .toList + .background + .use { result => + IO.sleep(1.millis) *> deglitcher.throttle *> result.flatMap(_.embedError) + } + } + } + .assertEquals(List(0 -> 0.millis, 2 -> 101.millis, 3 -> 230.millis)) + } + +} diff --git a/project/Settings.scala b/project/Settings.scala index cde22cad..395cfff0 100644 --- a/project/Settings.scala +++ b/project/Settings.scala @@ -34,6 +34,12 @@ object Settings { ) ) + val CatsEffectTestkit = Def.setting( + Seq[ModuleID]( + "org.typelevel" %%% "cats-effect-testkit" % catsEffect + ) + ) + val CatsLaws = Def.setting( Seq[ModuleID]( "org.typelevel" %%% "cats-laws" % cats From 439df31f8a0323f2861f632444642e347e7c6ad9 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 22 Nov 2023 21:41:29 +0000 Subject: [PATCH 2/3] Add test for multiple throttles --- .../test/scala/crystal/DeglitcherSuite.scala | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/modules/tests/shared/src/test/scala/crystal/DeglitcherSuite.scala b/modules/tests/shared/src/test/scala/crystal/DeglitcherSuite.scala index e6c15170..8aef7e07 100644 --- a/modules/tests/shared/src/test/scala/crystal/DeglitcherSuite.scala +++ b/modules/tests/shared/src/test/scala/crystal/DeglitcherSuite.scala @@ -25,14 +25,14 @@ class DeglitcherSuite extends CatsEffectSuite { TestControl .executeEmbed { Deglitcher[IO](100.millis).flatMap { deglitcher => - server(() -> 1.millis, () -> 1.millis, () -> 1.millis) + server(0 -> 1.millis, 1 -> 1.millis, 2 -> 1.millis) .through(deglitcher.debounce) - .evalMap(_ => IO.realTime) + .evalMap(IO.realTime.tupleLeft(_)) .compile .toList } } - .assertEquals(List(1.millis, 2.millis, 3.millis)) + .assertEquals(List(0 -> 1.millis, 1 -> 2.millis, 2 -> 3.millis)) } test("respects throttling") { @@ -53,4 +53,26 @@ class DeglitcherSuite extends CatsEffectSuite { .assertEquals(List(0 -> 0.millis, 2 -> 101.millis, 3 -> 230.millis)) } + test("respects multiple throttles") { + TestControl + .executeEmbed { + Deglitcher[IO](100.millis).flatMap { deglitcher => + server(0 -> 20.millis) + .through(deglitcher.debounce) + .evalMap(IO.realTime.tupleLeft(_)) + .compile + .toList + .background + .use { result => + IO.sleep(10.millis) *> + deglitcher.throttle *> + IO.sleep(40.millis) *> + deglitcher.throttle *> + result.flatMap(_.embedError) + } + } + } + .assertEquals(List(0 -> 150.millis)) + } + } From fcade64aab43bdf6433a14f54dcff3c130cb1361 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 22 Nov 2023 21:43:30 +0000 Subject: [PATCH 3/3] Organize imports --- modules/core/shared/src/main/scala/crystal/Deglitcher.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/shared/src/main/scala/crystal/Deglitcher.scala b/modules/core/shared/src/main/scala/crystal/Deglitcher.scala index 8fefe34d..c48df6e2 100644 --- a/modules/core/shared/src/main/scala/crystal/Deglitcher.scala +++ b/modules/core/shared/src/main/scala/crystal/Deglitcher.scala @@ -6,8 +6,8 @@ package crystal import cats.effect.Ref import cats.effect.Temporal import cats.syntax.all._ -import fs2.Stream import fs2.Pipe +import fs2.Stream import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration