Skip to content

Commit

Permalink
Add some tests for the auto-acking feature
Browse files Browse the repository at this point in the history
  • Loading branch information
satabin committed Feb 13, 2024
1 parent d7feba5 commit 75b674a
Show file tree
Hide file tree
Showing 9 changed files with 321 additions and 14 deletions.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ val commonSettings = List(
libraryDependencies ++= Seq(
"co.fs2" %%% "fs2-core" % Versions.fs2,
"org.scalameta" %%% "munit" % Versions.munit % Test,
"org.typelevel" %%% "munit-cats-effect-3" % Versions.munitCatsEffect % Test
"org.typelevel" %%% "munit-cats-effect-3" % Versions.munitCatsEffect % Test,
"org.typelevel" %%% "cats-collections-core" % "0.9.8" % Test,
"org.typelevel" %%% "cats-effect-testkit" % "3.5.3" % Test
)
)

Expand Down
22 changes: 9 additions & 13 deletions core/src/main/scala/de/commercetools/queue/QueueSubscriber.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package de.commercetools.queue

import cats.data.Chain
import cats.effect.IO
import fs2.{Chunk, Stream}

import scala.concurrent.duration.FiniteDuration
import fs2.Pull
import cats.effect.kernel.Outcome
import cats.syntax.all._
import fs2.{Chunk, Pull, Stream}

import scala.concurrent.duration.FiniteDuration

/**
* The base interface to subscribe to a queue.
Expand Down Expand Up @@ -47,11 +45,10 @@ trait QueueSubscriber[T] {
// to have full control over nacking things in time after a failure, and emitting
// results up to the error, we resort to a `Pull`, which allows this fine graind control
// over pulling/emitting/failing
// it keeps the chunk structure as best as it can
def doChunk(chunk: Chunk[MessageContext[T]], idx: Int, acc: Chain[Res]): Pull[IO, Res, Unit] =
def doChunk(chunk: Chunk[MessageContext[T]], idx: Int): Pull[IO, Res, Unit] =
if (idx >= chunk.size) {
// we are done, emit the chunk
Pull.output(Chunk.chain(acc))
Pull.done

} else {
val ctx = chunk(idx)
Expand All @@ -66,15 +63,14 @@ trait QueueSubscriber[T] {
})
.attempt
.flatMap {
case Right(res) => doChunk(chunk, idx + 1, acc.append(res))
case Right(res) => Pull.output1(res) >> doChunk(chunk, idx + 1)
case Left(t) =>
// one processing failed, emit everything that was processed in the chunk
// and fail
Pull.output(Chunk.chain(acc)).covary[IO] *> Pull.raiseError[IO](t)
// one processing failed
Pull.raiseError[IO](t)
}
}
messages(batchSize, waitingTime).repeatPull(_.uncons.flatMap {
case Some((hd, tl)) => doChunk(hd, 0, Chain.empty).as(Some(tl))
case Some((hd, tl)) => doChunk(hd, 0).as(Some(tl))
case None => Pull.pure(None)
})
}
Expand Down
84 changes: 84 additions & 0 deletions core/src/test/scala/de/commercetools/queue/SubscriberSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package de.commercetools.queue

import cats.collections.Heap
import cats.effect.IO
import cats.effect.std.AtomicCell
import cats.effect.testkit.TestControl
import cats.syntax.traverse._
import de.commercetools.queue.testing._
import munit.CatsEffectSuite

import scala.concurrent.duration._
import cats.syntax.either._

class SubscriberSuite extends CatsEffectSuite {

val queueSub = ResourceFixture(
AtomicCell[IO]
.of(testing.QueueState[String](Heap.empty, List.empty, Map.empty))
.map { state =>
val queue = new TestQueue[String](state = state, messageTTL = 15.minutes, lockTTL = 1.minute)
(queue, new TestQueueSubscriber(queue))
}
.toResource)

queueSub.test("Successful messages must be acked") { case (queue, subscriber) =>
TestControl
.executeEmbed(for {
// first populate the queue
messages <- List.range(0, 100).traverse { i =>
IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(s"message-$i", _))
}
_ <- queue.setAvailableMessages(messages)
// then process messages in batches of 5
// processing is (virtually) instantaneous in this case,
// so messages are immediately acked, from the mocked time PoV
// however, receiving messages waits for the amount of provided `waitingTime`
// in the test queue implementation, event if enough messages are available
// so this step makes time progress in steps of `waitingTime`
result <- subscriber
.processWithAutoAck(batchSize = 5, waitingTime = 40.millis)(_ => IO.pure(1))
.interruptAfter(3.seconds)
.compile
.foldMonoid
} yield result)
.flatMap { result =>
for {
_ <- assertIO(IO.pure(result), 100)
_ <- assertIO(queue.getAvailableMessages, Nil)
_ <- assertIO(queue.getLockedMessages, Nil)
_ <- assertIO(queue.getDelayedMessages, Nil)
} yield ()
}
}

queueSub.test("Messages must be unack'ed if processing fails and emit everything up to failure") {
case (queue, subscriber) =>
TestControl
.executeEmbed(for {
// first populate the queue
messages <- List.range(0, 100).traverse { i =>
IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(s"message-$i", _))
}
_ <- queue.setAvailableMessages(messages)
result <- subscriber
// take all messages in one big batch
.processWithAutoAck(batchSize = 100, waitingTime = 40.millis)(m =>
IO.raiseWhen(m == "message-43")(new Exception("BOOM")).as(m))
.attempt
.compile
.toList
} yield (messages, result))
.flatMap { case (originals, result) =>
for {
// check that all messages were consumed up to message #43
_ <- assertIO(IO.pure(result.init), originals.take(43).map(m => Right(m.payload)))
_ <- assertIO(IO.pure(result.last.leftMap(_.getMessage())), Left("BOOM"))
_ <- assertIO(queue.getAvailableMessages, originals.drop(43))
_ <- assertIO(queue.getLockedMessages, Nil)
_ <- assertIO(queue.getDelayedMessages, Nil)
} yield ()
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package de.commercetools.queue.testing

import cats.effect.IO
import cats.effect.std.AtomicCell
import de.commercetools.queue.MessageContext

import java.time.Instant
import java.util.UUID
import scala.concurrent.duration.FiniteDuration

final case class LockedTestMessage[T](
lock: UUID,
msg: TestMessage[T],
lockedUntil: Instant,
lockTTL: FiniteDuration,
state: AtomicCell[IO, QueueState[T]])
extends MessageContext[T] {

override def payload: T = msg.payload

override def enqueuedAt: Instant = msg.enqueuedAt

override val metadata: Map[String, String] = Map.empty

override def ack(): IO[Unit] =
// done, just delete it
state.update { state =>
state.copy(locked = state.locked.removed(lock))
}

override def nack(): IO[Unit] =
// move it back to available
state.update { state =>
state.locked.get(lock) match {
case Some(msg) =>
state.copy(locked = state.locked.removed(lock), available = state.available.add(msg.msg))
case None =>
state
}
}

override def extendLock(): IO[Unit] =
state.evalUpdate { state =>
state.locked.get(lock) match {
case Some(msg) =>
IO.realTimeInstant.map { now =>
state.copy(locked = state.locked.updated(lock, msg.copy(lockedUntil = now.plusMillis(lockTTL.toMillis))))
}
case None =>
IO.pure(state)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package de.commercetools.queue.testing

import cats.collections.Heap

import java.util.UUID

final case class QueueState[T](available: Heap[TestMessage[T]], delayed: List[TestMessage[T]], locked: Map[UUID, LockedTestMessage[T]])
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package de.commercetools.queue.testing

import java.time.Instant
import cats.Order

final case class TestMessage[T](payload: T, enqueuedAt: Instant)

object TestMessage {

implicit def order[T]: Order[TestMessage[T]] = Order.by(_.enqueuedAt.toEpochMilli())

}
119 changes: 119 additions & 0 deletions core/src/test/scala/de/commercetools/queue/testing/TestQueue.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package de.commercetools.queue.testing

import cats.collections.Heap
import cats.data.Chain
import cats.effect.IO
import cats.effect.std.AtomicCell
import cats.syntax.traverse._
import fs2.Chunk

import scala.annotation.tailrec
import scala.concurrent.duration.FiniteDuration

class TestQueue[T](
state: AtomicCell[IO, QueueState[T]],
val messageTTL: FiniteDuration,
val lockTTL: FiniteDuration) {

// updates the given state with the current situation at `now`
// this is used to have a fresh view of the queue whenever it is accessed
// for publication, subscription, or to get the current state for checks
private def update(state: QueueState[T]): IO[QueueState[T]] = IO.realTimeInstant.map { now =>
// put back expired locked messages in the available messages
val (stillLocked, toUnlock) = state.locked.partitionMap {
case e @ (_, locked) if locked.lockedUntil.isAfter(now) => Left(e)
case (_, locked) => Right(locked.msg)
}
val withUnlocked = state.available.addAll(toUnlock)
// put delayed messages for which delay expired
val (stillDelayed, toPublish) = state.delayed.partition(_.enqueuedAt.isAfter(now))
val withDelayed = withUnlocked.addAll(toPublish)
// now remove all messages for which global TTL is expired
val withoutExpired = withDelayed.foldLeft(Heap.empty[TestMessage[T]]) { (acc, msg) =>
if (msg.enqueuedAt.plusMillis(messageTTL.toMillis).isAfter(now)) {
// still valid
acc.add(msg)
} else {
// expired, drop it
acc
}
}
state.copy(available = withoutExpired, locked = stillLocked.toMap, delayed = stillDelayed)

}

def getState: IO[QueueState[T]] =
state.evalGetAndUpdate(update(_))

def setAvailableMessages(messages: List[TestMessage[T]]): IO[Unit] =
state.update(_.copy(available = Heap.fromIterable(messages)))

def getAvailableMessages: IO[List[TestMessage[T]]] =
getState.map(_.available.toList)

def setDelayedMessages(messages: List[TestMessage[T]]): IO[Unit] =
state.update(_.copy(delayed = messages))

def getDelayedMessages: IO[List[TestMessage[T]]] =
getState.map(_.delayed.toList)

def setLockedMessages(messages: List[LockedTestMessage[T]]): IO[Unit] =
state.update(_.copy(locked = messages.map(m => m.lock -> m).toMap))

def getLockedMessages: IO[List[LockedTestMessage[T]]] =
getState.map(_.locked.values.toList)

private def take(n: Int, available: Heap[TestMessage[T]]): (Chain[TestMessage[T]], Heap[TestMessage[T]]) = {
@tailrec
def loop(n: Int, available: Heap[TestMessage[T]], acc: Chain[TestMessage[T]])
: (Chain[TestMessage[T]], Heap[TestMessage[T]]) =
if (n <= 0) {
(acc, available)
} else {
available.getMin match {
case Some(msg) => loop(n - 1, available.remove, acc.append(msg))
case None => (acc, available)
}
}
loop(n, available, Chain.empty)
}

def lockMessages(n: Int): IO[Chunk[LockedTestMessage[T]]] =
state.evalModify { state =>
for {
now <- IO.realTimeInstant
state <- update(state)
// now lock the first `batchSize` available messages
(batch, stillAvailable) = take(n, state.available)
// create the locked messages out of the batch
newlyLocked <- batch
.traverse(msg =>
(IO.randomUUID).map(lock =>
(
lock,
LockedTestMessage(
lock = lock,
msg = msg,
lockedUntil = now.plusMillis(lockTTL.toMillis),
lockTTL = lockTTL,
state = this.state))))
} yield (
state.copy(available = stillAvailable, locked = state.locked ++ newlyLocked.iterator),
Chunk.chain(newlyLocked.map(_._2)))
}

def enqeueMessages(messages: List[T], delay: Option[FiniteDuration]) =
state.evalUpdate { state =>
for {
now <- IO.realTimeInstant
state <- update(state)
} yield delay match {
case None =>
state.copy(available = state.available.addAll(messages.map(TestMessage(_, now))))
case Some(delay) =>
val delayed = now.plusMillis(delay.toMillis)
state.copy(delayed = messages.map(TestMessage(_, delayed)) reverse_::: state.delayed)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package de.commercetools.queue.testing

import cats.effect.IO
import de.commercetools.queue.QueuePublisher

import scala.concurrent.duration.FiniteDuration

class TestQueuePublisher[T](queue: TestQueue[T]) extends QueuePublisher[T] {

override def publish(message: T, delay: Option[FiniteDuration]): IO[Unit] =
queue.enqeueMessages(message :: Nil, delay)

override def publish(messages: List[T], delay: Option[FiniteDuration]): IO[Unit] =
queue.enqeueMessages(messages, delay)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package de.commercetools.queue.testing

import cats.effect.IO
import de.commercetools.queue.{MessageContext, QueueSubscriber}
import fs2.Stream

import scala.concurrent.duration.FiniteDuration

class TestQueueSubscriber[T](queue: TestQueue[T]) extends QueueSubscriber[T] {

override def messages(batchSize: Int, waitingTime: FiniteDuration): fs2.Stream[IO, MessageContext[T]] =
(Stream.sleep_[IO](waitingTime) ++
Stream
.eval(queue.lockMessages(batchSize))
.unchunks).repeat

}

0 comments on commit 75b674a

Please sign in to comment.