From 75b674a4066caa57d982d4b202c6b2e8c82a7569 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Tue, 13 Feb 2024 18:20:57 +0100 Subject: [PATCH] Add some tests for the auto-acking feature --- build.sbt | 4 +- .../commercetools/queue/QueueSubscriber.scala | 22 ++-- .../commercetools/queue/SubscriberSuite.scala | 84 +++++++++++++ .../queue/testing/LockedTestMessage.scala | 54 ++++++++ .../queue/testing/QueueState.scala | 7 ++ .../queue/testing/TestMessage.scala | 12 ++ .../queue/testing/TestQueue.scala | 119 ++++++++++++++++++ .../queue/testing/TestQueuePublisher.scala | 16 +++ .../queue/testing/TestQueueSubscriber.scala | 17 +++ 9 files changed, 321 insertions(+), 14 deletions(-) create mode 100644 core/src/test/scala/de/commercetools/queue/SubscriberSuite.scala create mode 100644 core/src/test/scala/de/commercetools/queue/testing/LockedTestMessage.scala create mode 100644 core/src/test/scala/de/commercetools/queue/testing/QueueState.scala create mode 100644 core/src/test/scala/de/commercetools/queue/testing/TestMessage.scala create mode 100644 core/src/test/scala/de/commercetools/queue/testing/TestQueue.scala create mode 100644 core/src/test/scala/de/commercetools/queue/testing/TestQueuePublisher.scala create mode 100644 core/src/test/scala/de/commercetools/queue/testing/TestQueueSubscriber.scala diff --git a/build.sbt b/build.sbt index 917132c..26187a8 100644 --- a/build.sbt +++ b/build.sbt @@ -19,7 +19,9 @@ val commonSettings = List( libraryDependencies ++= Seq( "co.fs2" %%% "fs2-core" % Versions.fs2, "org.scalameta" %%% "munit" % Versions.munit % Test, - "org.typelevel" %%% "munit-cats-effect-3" % Versions.munitCatsEffect % Test + "org.typelevel" %%% "munit-cats-effect-3" % Versions.munitCatsEffect % Test, + "org.typelevel" %%% "cats-collections-core" % "0.9.8" % Test, + "org.typelevel" %%% "cats-effect-testkit" % "3.5.3" % Test ) ) diff --git a/core/src/main/scala/de/commercetools/queue/QueueSubscriber.scala b/core/src/main/scala/de/commercetools/queue/QueueSubscriber.scala index 29d67e2..32e946f 100644 --- a/core/src/main/scala/de/commercetools/queue/QueueSubscriber.scala +++ b/core/src/main/scala/de/commercetools/queue/QueueSubscriber.scala @@ -1,13 +1,11 @@ package de.commercetools.queue -import cats.data.Chain import cats.effect.IO -import fs2.{Chunk, Stream} - -import scala.concurrent.duration.FiniteDuration -import fs2.Pull import cats.effect.kernel.Outcome import cats.syntax.all._ +import fs2.{Chunk, Pull, Stream} + +import scala.concurrent.duration.FiniteDuration /** * The base interface to subscribe to a queue. @@ -47,11 +45,10 @@ trait QueueSubscriber[T] { // to have full control over nacking things in time after a failure, and emitting // results up to the error, we resort to a `Pull`, which allows this fine graind control // over pulling/emitting/failing - // it keeps the chunk structure as best as it can - def doChunk(chunk: Chunk[MessageContext[T]], idx: Int, acc: Chain[Res]): Pull[IO, Res, Unit] = + def doChunk(chunk: Chunk[MessageContext[T]], idx: Int): Pull[IO, Res, Unit] = if (idx >= chunk.size) { // we are done, emit the chunk - Pull.output(Chunk.chain(acc)) + Pull.done } else { val ctx = chunk(idx) @@ -66,15 +63,14 @@ trait QueueSubscriber[T] { }) .attempt .flatMap { - case Right(res) => doChunk(chunk, idx + 1, acc.append(res)) + case Right(res) => Pull.output1(res) >> doChunk(chunk, idx + 1) case Left(t) => - // one processing failed, emit everything that was processed in the chunk - // and fail - Pull.output(Chunk.chain(acc)).covary[IO] *> Pull.raiseError[IO](t) + // one processing failed + Pull.raiseError[IO](t) } } messages(batchSize, waitingTime).repeatPull(_.uncons.flatMap { - case Some((hd, tl)) => doChunk(hd, 0, Chain.empty).as(Some(tl)) + case Some((hd, tl)) => doChunk(hd, 0).as(Some(tl)) case None => Pull.pure(None) }) } diff --git a/core/src/test/scala/de/commercetools/queue/SubscriberSuite.scala b/core/src/test/scala/de/commercetools/queue/SubscriberSuite.scala new file mode 100644 index 0000000..76222a6 --- /dev/null +++ b/core/src/test/scala/de/commercetools/queue/SubscriberSuite.scala @@ -0,0 +1,84 @@ +package de.commercetools.queue + +import cats.collections.Heap +import cats.effect.IO +import cats.effect.std.AtomicCell +import cats.effect.testkit.TestControl +import cats.syntax.traverse._ +import de.commercetools.queue.testing._ +import munit.CatsEffectSuite + +import scala.concurrent.duration._ +import cats.syntax.either._ + +class SubscriberSuite extends CatsEffectSuite { + + val queueSub = ResourceFixture( + AtomicCell[IO] + .of(testing.QueueState[String](Heap.empty, List.empty, Map.empty)) + .map { state => + val queue = new TestQueue[String](state = state, messageTTL = 15.minutes, lockTTL = 1.minute) + (queue, new TestQueueSubscriber(queue)) + } + .toResource) + + queueSub.test("Successful messages must be acked") { case (queue, subscriber) => + TestControl + .executeEmbed(for { + // first populate the queue + messages <- List.range(0, 100).traverse { i => + IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(s"message-$i", _)) + } + _ <- queue.setAvailableMessages(messages) + // then process messages in batches of 5 + // processing is (virtually) instantaneous in this case, + // so messages are immediately acked, from the mocked time PoV + // however, receiving messages waits for the amount of provided `waitingTime` + // in the test queue implementation, event if enough messages are available + // so this step makes time progress in steps of `waitingTime` + result <- subscriber + .processWithAutoAck(batchSize = 5, waitingTime = 40.millis)(_ => IO.pure(1)) + .interruptAfter(3.seconds) + .compile + .foldMonoid + } yield result) + .flatMap { result => + for { + _ <- assertIO(IO.pure(result), 100) + _ <- assertIO(queue.getAvailableMessages, Nil) + _ <- assertIO(queue.getLockedMessages, Nil) + _ <- assertIO(queue.getDelayedMessages, Nil) + } yield () + } + } + + queueSub.test("Messages must be unack'ed if processing fails and emit everything up to failure") { + case (queue, subscriber) => + TestControl + .executeEmbed(for { + // first populate the queue + messages <- List.range(0, 100).traverse { i => + IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(s"message-$i", _)) + } + _ <- queue.setAvailableMessages(messages) + result <- subscriber + // take all messages in one big batch + .processWithAutoAck(batchSize = 100, waitingTime = 40.millis)(m => + IO.raiseWhen(m == "message-43")(new Exception("BOOM")).as(m)) + .attempt + .compile + .toList + } yield (messages, result)) + .flatMap { case (originals, result) => + for { + // check that all messages were consumed up to message #43 + _ <- assertIO(IO.pure(result.init), originals.take(43).map(m => Right(m.payload))) + _ <- assertIO(IO.pure(result.last.leftMap(_.getMessage())), Left("BOOM")) + _ <- assertIO(queue.getAvailableMessages, originals.drop(43)) + _ <- assertIO(queue.getLockedMessages, Nil) + _ <- assertIO(queue.getDelayedMessages, Nil) + } yield () + } + } + +} diff --git a/core/src/test/scala/de/commercetools/queue/testing/LockedTestMessage.scala b/core/src/test/scala/de/commercetools/queue/testing/LockedTestMessage.scala new file mode 100644 index 0000000..c99197a --- /dev/null +++ b/core/src/test/scala/de/commercetools/queue/testing/LockedTestMessage.scala @@ -0,0 +1,54 @@ +package de.commercetools.queue.testing + +import cats.effect.IO +import cats.effect.std.AtomicCell +import de.commercetools.queue.MessageContext + +import java.time.Instant +import java.util.UUID +import scala.concurrent.duration.FiniteDuration + +final case class LockedTestMessage[T]( + lock: UUID, + msg: TestMessage[T], + lockedUntil: Instant, + lockTTL: FiniteDuration, + state: AtomicCell[IO, QueueState[T]]) + extends MessageContext[T] { + + override def payload: T = msg.payload + + override def enqueuedAt: Instant = msg.enqueuedAt + + override val metadata: Map[String, String] = Map.empty + + override def ack(): IO[Unit] = + // done, just delete it + state.update { state => + state.copy(locked = state.locked.removed(lock)) + } + + override def nack(): IO[Unit] = + // move it back to available + state.update { state => + state.locked.get(lock) match { + case Some(msg) => + state.copy(locked = state.locked.removed(lock), available = state.available.add(msg.msg)) + case None => + state + } + } + + override def extendLock(): IO[Unit] = + state.evalUpdate { state => + state.locked.get(lock) match { + case Some(msg) => + IO.realTimeInstant.map { now => + state.copy(locked = state.locked.updated(lock, msg.copy(lockedUntil = now.plusMillis(lockTTL.toMillis)))) + } + case None => + IO.pure(state) + } + } + +} diff --git a/core/src/test/scala/de/commercetools/queue/testing/QueueState.scala b/core/src/test/scala/de/commercetools/queue/testing/QueueState.scala new file mode 100644 index 0000000..56f1162 --- /dev/null +++ b/core/src/test/scala/de/commercetools/queue/testing/QueueState.scala @@ -0,0 +1,7 @@ +package de.commercetools.queue.testing + +import cats.collections.Heap + +import java.util.UUID + +final case class QueueState[T](available: Heap[TestMessage[T]], delayed: List[TestMessage[T]], locked: Map[UUID, LockedTestMessage[T]]) diff --git a/core/src/test/scala/de/commercetools/queue/testing/TestMessage.scala b/core/src/test/scala/de/commercetools/queue/testing/TestMessage.scala new file mode 100644 index 0000000..d6727ed --- /dev/null +++ b/core/src/test/scala/de/commercetools/queue/testing/TestMessage.scala @@ -0,0 +1,12 @@ +package de.commercetools.queue.testing + +import java.time.Instant +import cats.Order + +final case class TestMessage[T](payload: T, enqueuedAt: Instant) + +object TestMessage { + + implicit def order[T]: Order[TestMessage[T]] = Order.by(_.enqueuedAt.toEpochMilli()) + +} diff --git a/core/src/test/scala/de/commercetools/queue/testing/TestQueue.scala b/core/src/test/scala/de/commercetools/queue/testing/TestQueue.scala new file mode 100644 index 0000000..e5808ec --- /dev/null +++ b/core/src/test/scala/de/commercetools/queue/testing/TestQueue.scala @@ -0,0 +1,119 @@ +package de.commercetools.queue.testing + +import cats.collections.Heap +import cats.data.Chain +import cats.effect.IO +import cats.effect.std.AtomicCell +import cats.syntax.traverse._ +import fs2.Chunk + +import scala.annotation.tailrec +import scala.concurrent.duration.FiniteDuration + +class TestQueue[T]( + state: AtomicCell[IO, QueueState[T]], + val messageTTL: FiniteDuration, + val lockTTL: FiniteDuration) { + + // updates the given state with the current situation at `now` + // this is used to have a fresh view of the queue whenever it is accessed + // for publication, subscription, or to get the current state for checks + private def update(state: QueueState[T]): IO[QueueState[T]] = IO.realTimeInstant.map { now => + // put back expired locked messages in the available messages + val (stillLocked, toUnlock) = state.locked.partitionMap { + case e @ (_, locked) if locked.lockedUntil.isAfter(now) => Left(e) + case (_, locked) => Right(locked.msg) + } + val withUnlocked = state.available.addAll(toUnlock) + // put delayed messages for which delay expired + val (stillDelayed, toPublish) = state.delayed.partition(_.enqueuedAt.isAfter(now)) + val withDelayed = withUnlocked.addAll(toPublish) + // now remove all messages for which global TTL is expired + val withoutExpired = withDelayed.foldLeft(Heap.empty[TestMessage[T]]) { (acc, msg) => + if (msg.enqueuedAt.plusMillis(messageTTL.toMillis).isAfter(now)) { + // still valid + acc.add(msg) + } else { + // expired, drop it + acc + } + } + state.copy(available = withoutExpired, locked = stillLocked.toMap, delayed = stillDelayed) + + } + + def getState: IO[QueueState[T]] = + state.evalGetAndUpdate(update(_)) + + def setAvailableMessages(messages: List[TestMessage[T]]): IO[Unit] = + state.update(_.copy(available = Heap.fromIterable(messages))) + + def getAvailableMessages: IO[List[TestMessage[T]]] = + getState.map(_.available.toList) + + def setDelayedMessages(messages: List[TestMessage[T]]): IO[Unit] = + state.update(_.copy(delayed = messages)) + + def getDelayedMessages: IO[List[TestMessage[T]]] = + getState.map(_.delayed.toList) + + def setLockedMessages(messages: List[LockedTestMessage[T]]): IO[Unit] = + state.update(_.copy(locked = messages.map(m => m.lock -> m).toMap)) + + def getLockedMessages: IO[List[LockedTestMessage[T]]] = + getState.map(_.locked.values.toList) + + private def take(n: Int, available: Heap[TestMessage[T]]): (Chain[TestMessage[T]], Heap[TestMessage[T]]) = { + @tailrec + def loop(n: Int, available: Heap[TestMessage[T]], acc: Chain[TestMessage[T]]) + : (Chain[TestMessage[T]], Heap[TestMessage[T]]) = + if (n <= 0) { + (acc, available) + } else { + available.getMin match { + case Some(msg) => loop(n - 1, available.remove, acc.append(msg)) + case None => (acc, available) + } + } + loop(n, available, Chain.empty) + } + + def lockMessages(n: Int): IO[Chunk[LockedTestMessage[T]]] = + state.evalModify { state => + for { + now <- IO.realTimeInstant + state <- update(state) + // now lock the first `batchSize` available messages + (batch, stillAvailable) = take(n, state.available) + // create the locked messages out of the batch + newlyLocked <- batch + .traverse(msg => + (IO.randomUUID).map(lock => + ( + lock, + LockedTestMessage( + lock = lock, + msg = msg, + lockedUntil = now.plusMillis(lockTTL.toMillis), + lockTTL = lockTTL, + state = this.state)))) + } yield ( + state.copy(available = stillAvailable, locked = state.locked ++ newlyLocked.iterator), + Chunk.chain(newlyLocked.map(_._2))) + } + + def enqeueMessages(messages: List[T], delay: Option[FiniteDuration]) = + state.evalUpdate { state => + for { + now <- IO.realTimeInstant + state <- update(state) + } yield delay match { + case None => + state.copy(available = state.available.addAll(messages.map(TestMessage(_, now)))) + case Some(delay) => + val delayed = now.plusMillis(delay.toMillis) + state.copy(delayed = messages.map(TestMessage(_, delayed)) reverse_::: state.delayed) + } + } + +} diff --git a/core/src/test/scala/de/commercetools/queue/testing/TestQueuePublisher.scala b/core/src/test/scala/de/commercetools/queue/testing/TestQueuePublisher.scala new file mode 100644 index 0000000..e51d7be --- /dev/null +++ b/core/src/test/scala/de/commercetools/queue/testing/TestQueuePublisher.scala @@ -0,0 +1,16 @@ +package de.commercetools.queue.testing + +import cats.effect.IO +import de.commercetools.queue.QueuePublisher + +import scala.concurrent.duration.FiniteDuration + +class TestQueuePublisher[T](queue: TestQueue[T]) extends QueuePublisher[T] { + + override def publish(message: T, delay: Option[FiniteDuration]): IO[Unit] = + queue.enqeueMessages(message :: Nil, delay) + + override def publish(messages: List[T], delay: Option[FiniteDuration]): IO[Unit] = + queue.enqeueMessages(messages, delay) + +} diff --git a/core/src/test/scala/de/commercetools/queue/testing/TestQueueSubscriber.scala b/core/src/test/scala/de/commercetools/queue/testing/TestQueueSubscriber.scala new file mode 100644 index 0000000..e37b1ee --- /dev/null +++ b/core/src/test/scala/de/commercetools/queue/testing/TestQueueSubscriber.scala @@ -0,0 +1,17 @@ +package de.commercetools.queue.testing + +import cats.effect.IO +import de.commercetools.queue.{MessageContext, QueueSubscriber} +import fs2.Stream + +import scala.concurrent.duration.FiniteDuration + +class TestQueueSubscriber[T](queue: TestQueue[T]) extends QueueSubscriber[T] { + + override def messages(batchSize: Int, waitingTime: FiniteDuration): fs2.Stream[IO, MessageContext[T]] = + (Stream.sleep_[IO](waitingTime) ++ + Stream + .eval(queue.lockMessages(batchSize)) + .unchunks).repeat + +}