Skip to content

Commit

Permalink
Merge pull request #1480 from bradovitt/elasticmq
Browse files Browse the repository at this point in the history
Elasticmq
  • Loading branch information
davidmartinezbarua authored Aug 11, 2024
2 parents edd8f27 + cce3657 commit 7be23f8
Show file tree
Hide file tree
Showing 5 changed files with 350 additions and 0 deletions.
6 changes: 6 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,12 @@ lazy val scala_libraries = (project in file("scala-libraries"))
"nl.gn0s1s" %% "elastic4s-core" % elastic4sVersion,
logback
),
libraryDependencies ++= Seq(
"org.elasticmq" %% "elasticmq-core" % "1.6.5",
"org.elasticmq" %% "elasticmq-server" % "1.6.5",
"org.elasticmq" %% "elasticmq-rest-sqs" % "1.6.5",
"software.amazon.awssdk" % "sqs" % "2.26.24"
),
Defaults.itSettings
)

Expand Down
52 changes: 52 additions & 0 deletions scala-libraries/src/main/resources/elasticmq.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# What is the outside visible address of this ElasticMQ node
# Used to create the queue URL (may be different from bind address!)
node-address {
protocol = http
host = localhost
port = 9324
context-path = ""
}

rest-sqs {
enabled = true
bind-port = 9324
bind-hostname = "0.0.0.0"
// Possible values: relaxed, strict
sqs-limits = strict
}

rest-stats {
enabled = true
bind-port = 9325
bind-hostname = "0.0.0.0"
}

// Should the node-address be generated from the bind port/hostname
// Set this to true e.g. when assigning port automatically by using port 0.
generate-node-address = false

queues {
queue1 {
defaultVisibilityTimeout = 10 seconds
delay = 0 seconds
receiveMessageWait = 0 seconds
deadLettersQueue {
name = "queue1-dead-letters"
maxReceiveCount = 3 // from 1 to 1000
}
fifo = false
contentBasedDeduplication = false
tags {
tag1 = "tagged1"
tag2 = "tagged2"
}
}
queue1-dead-letters { }
}

elastic-mq {
region = "elasticMQ"
endPoint = "http://localhost:9325"
access-key-id = "your aws access key id"
secret-access-key = "secret-access-token"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
package com.baeldung.elasticmq

import software.amazon.awssdk.auth.credentials.{
AwsBasicCredentials,
AwsCredentialsProviderChain,
StaticCredentialsProvider
}
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.model.*
import software.amazon.awssdk.services.sqs.{
SqsAsyncClient,
SqsAsyncClientBuilder
}

import java.net.URI
import java.util.UUID

import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.FutureConverters.*
import scala.jdk.CollectionConverters.*

class SQSAsyncClient(
queueURL: String,
region: String,
endpoint: String
)(implicit executionContext: ExecutionContext):

private val sqsAsyncClient: SqsAsyncClient =
SqsAsyncClient
.builder()
.region(Region.of(region))
.credentialsProvider(
AwsCredentialsProviderChain
.builder()
.credentialsProviders(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(
ElasticMQConfig.ELASTIC_MQ_ACCESS_KEY,
ElasticMQConfig.ELASTIC_MQ_SECRET_ACCESS_KEY
)
)
)
.build()
)
.endpointOverride(URI.create(endpoint))
.build()

def createStandardQueue(queueName: String): Future[CreateQueueResponse] =
val request = CreateQueueRequest.builder.queueName(queueName).build

sqsAsyncClient.createQueue(request).asScala

final lazy val createFIFOQueueAttributes = Map(
(QueueAttributeName.FIFO_QUEUE, "true")
).asJava

def createFIFOQueue(queueName: String): Future[CreateQueueResponse] =
val createQueueRequest = CreateQueueRequest.builder
.queueName(queueName)
.attributes(createFIFOQueueAttributes)
.build

sqsAsyncClient.createQueue(createQueueRequest).asScala

def deleteQueue(): Future[DeleteQueueResponse] =
val request = DeleteQueueRequest.builder().queueUrl(queueURL).build()

sqsAsyncClient.deleteQueue(request).asScala

def sendMessage(message: String): Future[SendMessageResponse] =
val request = SendMessageRequest
.builder()
.messageBody(message)
.queueUrl(queueURL)
.build()

sqsAsyncClient.sendMessage(request).asScala

def sendMessagesInBatch(
messages: List[String]
): Future[SendMessageBatchResponse] =
val batchRequestEntry = messages
.map(
SendMessageBatchRequestEntry
.builder()
.messageBody(_)
.id(UUID.randomUUID().toString)
.build()
)
.asJava
val sendMessageBatchRequest = SendMessageBatchRequest
.builder()
.queueUrl(queueURL)
.entries(batchRequestEntry)
.build()

sqsAsyncClient.sendMessageBatch(sendMessageBatchRequest).asScala

// maxNumberOfMessages must be less than 10.
def receiveMessages(
maxNumberOfMessages: Int
): Future[ReceiveMessageResponse] =
val receiveMessageRequest =
ReceiveMessageRequest
.builder()
.maxNumberOfMessages(maxNumberOfMessages)
.queueUrl(queueURL)
.waitTimeSeconds(10)
.build()

sqsAsyncClient.receiveMessage(receiveMessageRequest).asScala

def deleteMessage(receiptHandle: String): Future[DeleteMessageResponse] =
val deleteMessageRequest = DeleteMessageRequest
.builder()
.queueUrl(queueURL)
.receiptHandle(receiptHandle)
.build()

sqsAsyncClient.deleteMessage(deleteMessageRequest).asScala

def deleteMessageInBatch(
messages: List[Message]
): Future[DeleteMessageBatchResponse] =
val listDeleteMessageBatchRequestEntry = messages
.map(message =>
DeleteMessageBatchRequestEntry
.builder()
.receiptHandle(message.receiptHandle())
.build()
)
.asJava
val deleteMessageBatchRequest = DeleteMessageBatchRequest
.builder()
.queueUrl(queueURL)
.entries(listDeleteMessageBatchRequestEntry)
.build()

sqsAsyncClient.deleteMessageBatch(deleteMessageBatchRequest).asScala

def getQueueURL(queueName: String): Future[GetQueueUrlResponse] =
val getQueueUrlRequest =
GetQueueUrlRequest.builder().queueName(queueName).build()

sqsAsyncClient.getQueueUrl(getQueueUrlRequest).asScala

def listQueues(): Future[ListQueuesResponse] =
sqsAsyncClient.listQueues().asScala

def listQueuesStartingFromPrefix(prefix: String): Future[ListQueuesResponse] =
val listQueueStartingFromPrefixRequest =
ListQueuesRequest.builder().queueNamePrefix(prefix).build()

sqsAsyncClient.listQueues(listQueueStartingFromPrefixRequest).asScala

def changeMessageVisibility(
message: Message
): Future[ChangeMessageVisibilityResponse] =
val changeMessageVisibilityRequest = ChangeMessageVisibilityRequest
.builder()
.queueUrl(queueURL)
.receiptHandle(message.receiptHandle())
.visibilityTimeout(30)
.build()

sqsAsyncClient
.changeMessageVisibility(changeMessageVisibilityRequest)
.asScala

def changeMessageVisibilityOfBatch(
messages: List[Message]
): Future[ChangeMessageVisibilityBatchResponse] =
val changeMessageVisibilityBatchRequestEntry = messages
.map(message =>
ChangeMessageVisibilityBatchRequestEntry
.builder()
.receiptHandle(message.receiptHandle())
.visibilityTimeout(30)
.build()
)
.asJava
val changeMessageVisibilityRequest = ChangeMessageVisibilityBatchRequest
.builder()
.queueUrl(queueURL)
.entries(changeMessageVisibilityBatchRequestEntry)
.build()

sqsAsyncClient
.changeMessageVisibilityBatch(changeMessageVisibilityRequest)
.asScala

final lazy val purgeQueueRequest =
PurgeQueueRequest.builder().queueUrl(queueURL).build()
def purgeQueue(): Future[PurgeQueueResponse] =
sqsAsyncClient.purgeQueue(purgeQueueRequest).asScala

def setQueueAttributes(
attributes: Map[QueueAttributeName, String]
): Future[SetQueueAttributesResponse] =
val setQueueAttributesRequest = SetQueueAttributesRequest
.builder()
.queueUrl(queueURL)
.attributes(attributes.asJava)
.build()

sqsAsyncClient.setQueueAttributes(setQueueAttributesRequest).asScala

def tagQueue(tags: Map[String, String]): Future[TagQueueResponse] =
val tagQueueRequest =
TagQueueRequest.builder().queueUrl(queueURL).tags(tags.asJava).build()

sqsAsyncClient.tagQueue(tagQueueRequest).asScala

def untagQueue(listOfTagsToRemove: List[String]): Future[UntagQueueResponse] =
val untagQueueRequest = UntagQueueRequest
.builder()
.queueUrl(queueURL)
.tagKeys(listOfTagsToRemove.asJava)
.build()

sqsAsyncClient.untagQueue(untagQueueRequest).asScala
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.baeldung.elasticmq

import com.typesafe.config.{Config, ConfigFactory}

object ElasticMQConfig:

private final val config: Config = ConfigFactory.load("elasticmq.conf")

final val ELASTIC_MQ_ACCESS_KEY: String =
config.getString("elastic-mq.access-key-id")
final val ELASTIC_MQ_SECRET_ACCESS_KEY: String =
config.getString("elastic-mq.secret-access-key")

final val ELASTIC_MQ_REGION = config.getString("elastic-mq.region")
final val ELASTIC_MQ_ENDPOINT = config.getString("elastic-mq.endPoint")
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.baeldung.elasticmq

import org.elasticmq.rest.sqs.SQSRestServerBuilder
import org.elasticmq.server.ElasticMQServer
import org.elasticmq.server.config.ElasticMQServerConfig

import com.typesafe.config.ConfigFactory

import org.apache.pekko.actor.{Actor, ActorRef, ActorSystem}
import org.apache.pekko.event.LoggingAdapter

import scala.util.{Failure, Success}

object ElasticMQService extends App:

implicit val actorSystem: ActorSystem = ActorSystem.create()
implicit val executionContext: concurrent.ExecutionContextExecutor =
actorSystem.dispatcher
implicit val m_logger: LoggingAdapter = actorSystem.log

final val ElasticMQ_URL = s"http://localhost:9324/000000000000/"

val endpoint = "http://localhost:9325"
val region = "elasticmq"

val server = SQSRestServerBuilder
.withPort(9325)
.withInterface("localhost")
.start()

val elasticMQClient = new SQSAsyncClient(ElasticMQ_URL, region, endpoint)

val uselessWorkflow =
for
_ <- elasticMQClient.createStandardQueue("standardQueueForTest")
testQueueClient = new SQSAsyncClient(
ElasticMQ_URL + "standardQueueForTest",
region,
endpoint
)
_ <- testQueueClient.createFIFOQueue("fifoQueue.fifo")
_ <- testQueueClient.listQueues()
_ <- testQueueClient.sendMessage("Hi")
_ <- testQueueClient.sendMessagesInBatch(
List("Follow", "Baeldung", "on", "LinkedIn")
)
_ <- testQueueClient.receiveMessages(5)
_ <- testQueueClient.purgeQueue()
yield ()

uselessWorkflow
.andThen(_ => server.stopAndWait())
.onComplete:
case Success(_) => m_logger.info("queue created")
case Failure(exception) =>
m_logger.error(exception, "exception in uselessWorkflow")

0 comments on commit 7be23f8

Please sign in to comment.