From 57b572c9af6ff28faacf3df0c20bd8e102ffbc3e Mon Sep 17 00:00:00 2001 From: Dmitrii Azarenko Date: Mon, 24 Jul 2023 17:14:10 +0400 Subject: [PATCH] There are situations in which clients are limited in the rights to create queues, i.e. queues are created manually on the DevOps side, and clients are given only read-write permissions to these queues. The previous implementation of queue registration leads to an error: ``` An error occurred (AccessDenied) when calling the CreateQueue operation: User: arn:aws:sts::*** is not authorized to perform: sqs:createqueue on resource: arn:aws:sqs:us-west-2:*** because no identity-based policy allows the sqs:createqueue action ``` To avoid this error, it is suggested to first try to get this queue and if it does not exist, create it. --- dramatiq_sqs/broker.py | 53 ++++++++++++++++++++++++------------------ 1 file changed, 30 insertions(+), 23 deletions(-) diff --git a/dramatiq_sqs/broker.py b/dramatiq_sqs/broker.py index d329471..db9778c 100644 --- a/dramatiq_sqs/broker.py +++ b/dramatiq_sqs/broker.py @@ -104,35 +104,42 @@ def declare_queue(self, queue_name: str) -> None: } self.emit_before("declare_queue", queue_name) - self.queues[queue_name] = self.sqs.create_queue( - QueueName=prefixed_queue_name, - Attributes={ - "MessageRetentionPeriod": self.retention, - } - ) - if self.tags: - self.sqs.meta.client.tag_queue( - QueueUrl=self.queues[queue_name].url, - Tags=self.tags - ) - if self.dead_letter: - dead_letter_queue_name = f"{prefixed_queue_name}_dlq" - dead_letter_queue = self.sqs.create_queue( - QueueName=dead_letter_queue_name + try: + self.queues[queue_name] = self.sqs.get_queue_by_name( + QueueName=prefixed_queue_name, + ) + except self.sqs.meta.client.exceptions.QueueDoesNotExist: + self.queues[queue_name] = self.sqs.create_queue( + QueueName=prefixed_queue_name, + Attributes={ + "MessageRetentionPeriod": self.retention, + } ) if self.tags: self.sqs.meta.client.tag_queue( - QueueUrl=dead_letter_queue.url, + QueueUrl=self.queues[queue_name].url, Tags=self.tags ) - redrive_policy = { - "deadLetterTargetArn": dead_letter_queue.attributes["QueueArn"], - "maxReceiveCount": str(self.max_receives) - } - self.queues[queue_name].set_attributes(Attributes={ - "RedrivePolicy": json.dumps(redrive_policy) - }) + + if self.dead_letter: + dead_letter_queue_name = f"{prefixed_queue_name}_dlq" + dead_letter_queue = self.sqs.create_queue( + QueueName=dead_letter_queue_name + ) + if self.tags: + self.sqs.meta.client.tag_queue( + QueueUrl=dead_letter_queue.url, + Tags=self.tags + ) + redrive_policy = { + "deadLetterTargetArn": dead_letter_queue.attributes["QueueArn"], + "maxReceiveCount": str(self.max_receives) + } + self.queues[queue_name].set_attributes(Attributes={ + "RedrivePolicy": json.dumps(redrive_policy) + }) + self.emit_after("declare_queue", queue_name) def enqueue(self, message: dramatiq.Message, *, delay: Optional[int] = None) -> dramatiq.Message: