Skip to content

Commit

Permalink
Add SQS support
Browse files Browse the repository at this point in the history
  • Loading branch information
satabin committed Feb 8, 2024
1 parent 3d8a402 commit ee86bc8
Show file tree
Hide file tree
Showing 14 changed files with 293 additions and 19 deletions.
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def program(client: QueueClient): IO[Unit] = {
val queueName = "my-queue"
client.publisher[String](queueName).use { publisher =>
// subscribe and publish concurrently
subscribeStream(client.subscriber[String](queueName))
subscribeStream(client.subscriber[String](queueName, 30.seconds))
.concurrently(publishStream(publisher))
.compile
// runs forever
Expand All @@ -62,7 +62,7 @@ def program(client: QueueClient): IO[Unit] = {

```scala
import de.commercetools.queue.azure.servicebus._
import com.azure.identity._
import com.azure.identity.DefaultAzureCredentialBuilder

val namespace = "{namespace}.servicebus.windows.net" // your namespace
val credentials = new DefaultAzureCredentialBuilder().build() // however you want to authenticate
Expand All @@ -72,8 +72,14 @@ ServiceBusClient(namespace, credentials).use(program(_))

## Working with AWS SQS

**TODO**

```scala
import de.commercetools.queue.aws.sqs._
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider

val region = Region.US_EAST_1 // your region
val credentials = DefaultCredentialsProvider.create() // however you want to authenticate

SQSClient(region, credentials).use(program(_))
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package de.commercetools.queue.aws.sqs

import cats.effect.IO
import de.commercetools.queue.QueueAdministration
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{CreateQueueRequest, QueueAttributeName}

import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._
import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException

class SQSAdministration(client: SqsAsyncClient, getQueueUrl: String => IO[String]) extends QueueAdministration {

override def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): IO[Unit] =
IO.fromCompletableFuture {
IO {
client.createQueue(
CreateQueueRequest
.builder()
.queueName(name)
.attributes(Map(
QueueAttributeName.MESSAGE_RETENTION_PERIOD -> messageTTL.toSeconds.toString(),
QueueAttributeName.VISIBILITY_TIMEOUT -> lockTTL.toSeconds.toString()).asJava)
.build())
}
}.void

override def delete(name: String): IO[Unit] =
getQueueUrl(name).flatMap { queueUrl =>
IO.fromCompletableFuture {
IO {
client.deleteQueue(
DeleteQueueRequest
.builder()
.queueUrl(queueUrl)
.build())
}
}
}.void

override def exists(name: String): IO[Boolean] =
getQueueUrl(name).as(true).recover { _: QueueDoesNotExistException => false }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package de.commercetools.queue.aws.sqs

import cats.effect.{IO, Resource}
import de.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueSubscriber, Serializer}
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.http.async.SdkAsyncHttpClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest

import java.net.URI
import scala.concurrent.duration.FiniteDuration

class SQSClient private (client: SqsAsyncClient) extends QueueClient {

private def getQueueUrl(name: String): IO[String] =
IO.fromCompletableFuture {
IO {
client.getQueueUrl(GetQueueUrlRequest.builder().queueName(name).build())
}
}.map(_.queueUrl)

override def administration: QueueAdministration =
new SQSAdministration(client, getQueueUrl(_))

override def publisher[T: Serializer](name: String): Resource[IO, QueuePublisher[T]] =
Resource.eval(getQueueUrl(name).map(new SQSPublisher(_, client)))

override def subscriber[T: Deserializer](name: String, lockTTL: FiniteDuration): QueueSubscriber[T] =
new SQSSubscriber[T](lockTTL, getQueueUrl(name), client)

}

object SQSClient {

def apply(
region: Region,
credentials: AwsCredentialsProvider,
endpoint: Option[URI] = None,
httpClient: Option[SdkAsyncHttpClient] = None
): Resource[IO, SQSClient] =
Resource
.fromAutoCloseable {
IO {
val builder =
SqsAsyncClient.builder().region(region).credentialsProvider(credentials)

endpoint.foreach(builder.endpointOverride(_))

httpClient.foreach(builder.httpClient(_))

builder.build()
}
}
.map(new SQSClient(_))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package de.commercetools.queue.aws.sqs

import scala.concurrent.duration.FiniteDuration
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import de.commercetools.queue.MessageContext
import java.time.Instant
import cats.effect.IO
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest

class SQSMessageContext[T](
val payload: T,
val enqueuedAt: Instant,
val metadata: Map[String, String],
receiptHandle: String,
lockTTL: FiniteDuration,
queueUrl: String,
client: SqsAsyncClient)
extends MessageContext[T] {

override def ack(): IO[Unit] =
IO.fromCompletableFuture {
IO {
client.deleteMessage(DeleteMessageRequest.builder().queueUrl(queueUrl).receiptHandle(receiptHandle).build())
}
}.void

override def nack(): IO[Unit] =
IO.fromCompletableFuture {
IO {
client.changeMessageVisibility(
ChangeMessageVisibilityRequest
.builder()
.queueUrl(queueUrl)
.receiptHandle(receiptHandle)
.visibilityTimeout(0)
.build())
}
}.void

override def extendLock(): IO[Unit] =
IO.fromCompletableFuture {
IO {
client.changeMessageVisibility(
ChangeMessageVisibilityRequest
.builder()
.queueUrl(queueUrl)
.receiptHandle(receiptHandle)
.visibilityTimeout(lockTTL.toSeconds.toInt)
.build())
}
}.void

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package de.commercetools.queue.aws.sqs

import cats.effect.IO
import de.commercetools.queue.{QueuePublisher, Serializer}
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.SendMessageRequest

import scala.concurrent.duration.FiniteDuration
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry

class SQSPublisher[T](queueUrl: String, client: SqsAsyncClient)(implicit serializer: Serializer[T])
extends QueuePublisher[T] {

override def publish(message: T, delay: Option[FiniteDuration]): IO[Unit] =
IO.fromCompletableFuture {
IO {
client.sendMessage(
SendMessageRequest
.builder()
.queueUrl(queueUrl)
.messageBody(serializer.serialize(message))
.delaySeconds(delay.fold(0)(_.toSeconds.toInt))
.build())
}
}.void

override def publish(messages: List[T], delay: Option[FiniteDuration]): IO[Unit] =
IO.fromCompletableFuture {
IO {
val delaySeconds = delay.fold(0)(_.toSeconds.toInt)
client.sendMessageBatch(
SendMessageBatchRequest
.builder()
.queueUrl(queueUrl)
.entries(messages.map { message =>
SendMessageBatchRequestEntry
.builder()
.messageBody(serializer.serialize(message))
.delaySeconds(delaySeconds)
.build()
}: _*)
.build())
}
}.void

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package de.commercetools.queue.aws.sqs

import cats.effect.IO
import de.commercetools.queue.{Deserializer, MessageContext, QueueSubscriber}
import fs2.{Chunk, Stream}
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{MessageSystemAttributeName, ReceiveMessageRequest}

import java.time.Instant
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

class SQSSubscriber[T](
lockTTL: FiniteDuration,
getQueueUrl: IO[String],
client: SqsAsyncClient
)(implicit deserializer: Deserializer[T])
extends QueueSubscriber[T] {

override def messages(batchSize: Int, waitingTime: FiniteDuration): Stream[IO, MessageContext[T]] =
Stream.eval(getQueueUrl).flatMap { queueUrl =>
Stream
.repeatEval(IO.fromCompletableFuture {
IO {
client.receiveMessage(
ReceiveMessageRequest
.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(batchSize)
.visibilityTimeout(lockTTL.toSeconds.toInt)
.waitTimeSeconds(waitingTime.toSeconds.toInt)
.attributeNamesWithStrings(MessageSystemAttributeName.SENT_TIMESTAMP.toString())
.build())
}
})
.map { messages =>
Chunk.from(messages.messages().asScala)
}
.unchunks
.evalMap { message =>
for {
sentTimestamp <- IO(
Instant.ofEpochMilli(message.attributes().get(MessageSystemAttributeName.SENT_TIMESTAMP).toLong))
data <- deserializer.deserialize(message.body())
} yield new SQSMessageContext(
data,
sentTimestamp,
message.attributesAsStrings().asScala.toMap,
message.receiptHandle(),
lockTTL,
queueUrl,
client)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import com.azure.messaging.servicebus.ServiceBusClientBuilder
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder
import de.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueSubscriber, Serializer}

import scala.concurrent.duration.FiniteDuration

class ServiceBusClient private (
clientBuilder: ServiceBusClientBuilder,
adminBuilder: ServiceBusAdministrationClientBuilder)
Expand All @@ -20,7 +22,7 @@ class ServiceBusClient private (
sender <- Resource.make(IO(clientBuilder.sender().queueName(name).buildAsyncClient()))(s => IO(s.close()))
} yield new ServiceBusQueuePublisher[T](sender)

override def subscriber[T: Deserializer](name: String): QueueSubscriber[T] =
override def subscriber[T: Deserializer](name: String, lockTTL: FiniteDuration): QueueSubscriber[T] =
new ServiceBusQueueSubscriber[T](name, clientBuilder)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import cats.effect.IO
import cats.syntax.all._
import com.azure.messaging.servicebus.{ServiceBusMessage, ServiceBusSenderAsyncClient}
import de.commercetools.queue.{QueuePublisher, Serializer}
import fs2.Pipe

import java.time.ZoneOffset
import scala.concurrent.duration.FiniteDuration
Expand All @@ -26,9 +25,4 @@ class ServiceBusQueuePublisher[Data](sender: ServiceBusSenderAsyncClient)(implic
fromBlockingMono(sender.sendMessages(sbMessages.asJava)).void
}

override def sink(chunkSize: Int): Pipe[IO, Data, Nothing] =
_.chunkN(chunkSize).foreach { chunk =>
publish(chunk.toList, None)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ServiceBusQueueSubscriber[Data](
fromPublisher[IO, ServiceBusReceivedMessage](receiver.receiveMessages(), 1)
.groupWithin(batchSize, waitingTime)
.unchunks
.evalMapChunk { sbMessage =>
.evalMap { sbMessage =>
deserializer.deserialize(sbMessage.getBody().toString()).map { data =>
new ServiceBusMessageContext(data, sbMessage, receiver)
}
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ lazy val awsSQS = crossProject(JVMPlatform)
.settings(
name := "cloud-queues-aws-sqs",
libraryDependencies ++= List(
"io.laserdisc" %%% "fs2-aws-sqs" % "6.1.1"
"software.amazon.awssdk" % "sqs" % "2.18.35"
)
)
.dependsOn(core)
Expand All @@ -80,4 +80,4 @@ lazy val readme = project
libraryDependencies ++= List(
"com.azure" % "azure-identity" % "1.11.1"
))
.dependsOn(azureServiceBus.jvm)
.dependsOn(azureServiceBus.jvm, awsSQS.jvm)
4 changes: 3 additions & 1 deletion core/src/main/scala/de/commercetools/queue/QueueClient.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package de.commercetools.queue

import cats.effect.{IO, Resource}
import scala.concurrent.duration.FiniteDuration

/**
* The entry point to using queues.
Expand All @@ -21,7 +22,8 @@ trait QueueClient {

/**
* Creates a subscriber of the queue.
* The `lockTTL` is used by the subscriber to renew lock and by the auto-renewal feature.
*/
def subscriber[T: Deserializer](name: String): QueueSubscriber[T]
def subscriber[T: Deserializer](name: String, lockTTL: FiniteDuration): QueueSubscriber[T]

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ trait QueuePublisher[T] {
* produced data to the queue. The messages are published in batches, according
* to the `batchSize` parameter.
*/
def sink(batchSize: Int = 10): Pipe[IO, T, Nothing]
def sink(batchSize: Int = 10): Pipe[IO, T, Nothing] =
_.chunkN(batchSize).foreach { chunk =>
publish(chunk.toList, None)
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package de.commercetools.queue

import fs2.Stream
import cats.effect.IO
import fs2.Stream

import scala.concurrent.duration.FiniteDuration

/**
Expand Down
Loading

0 comments on commit ee86bc8

Please sign in to comment.