Skip to content

Commit

Permalink
Merge pull request #85 from commercetools/idempotent-nack
Browse files Browse the repository at this point in the history
Make ack/nack idempotent on an empty batch
  • Loading branch information
mladens authored Nov 8, 2024
2 parents 3e70b19 + ca39f85 commit d7f254f
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ private class SQSMessageBatch[F[_], T](

override def messages: Chunk[Message[F, T]] = payload

override def ackAll: F[List[MessageId]] =
override def ackAll: F[List[MessageId]] = if (payload.isEmpty) {
F.pure(List.empty[MessageId])
} else {
F.fromCompletableFuture {
F.delay {
client.deleteMessageBatch(
Expand All @@ -52,8 +54,11 @@ private class SQSMessageBatch[F[_], T](
)
}
}.map(res => res.failed().asScala.map(message => MessageId(message.id())).toList)
}

override def nackAll: F[List[MessageId]] =
override def nackAll: F[List[MessageId]] = if (payload.isEmpty) {
F.pure(List.empty[MessageId])
} else {
F.fromCompletableFuture {
F.delay {
client.changeMessageVisibilityBatch(
Expand All @@ -74,4 +79,5 @@ private class SQSMessageBatch[F[_], T](
)
}
}.map(res => res.failed().asScala.map(message => MessageId(message.id())).toList)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,21 @@ trait QueueSubscriberSuite extends CatsEffectSuite { self: QueueClientSuite =>
}
}

withQueue.test("messageBatch noop ack/nack on empty batches") { queueName =>
val client = clientFixture()
client.subscribe(queueName).puller.use { puller =>
for {
msgBatch <- puller.pullMessageBatch(10, waitingTime)
_ = assertEquals(msgBatch.messages.size, 0)
_ <- msgBatch.nackAll
_ <- msgBatch.ackAll
_ <- assertIOBoolean(
puller.pullMessageBatch(10, waitingTime).map(_.messages.isEmpty)
)
} yield ()
}
}

withQueue.test("process respects the decision from the handler") { queueName =>
val client = clientFixture()
for {
Expand Down

0 comments on commit d7f254f

Please sign in to comment.