diff --git a/lib/queue/queue.py b/lib/queue/queue.py index f54a286f..464bc1ed 100644 --- a/lib/queue/queue.py +++ b/lib/queue/queue.py @@ -1,6 +1,7 @@ import json from typing import List, Dict, Tuple import os +from threading import Lock import boto3 import botocore @@ -17,6 +18,7 @@ def __init__(self): """ Start a specific queue - must pass input_queue_name. """ + self.sqs_lock = Lock() self.sqs = self.get_sqs() @staticmethod @@ -111,14 +113,16 @@ def get_sqs(self) -> boto3.resources.base.ServiceResource: deploy_env = get_environment_setting("DEPLOY_ENV") if deploy_env == "local": logger.info(f"Using ElasticMQ Interface") - return boto3.resource('sqs', - region_name=(get_environment_setting("AWS_DEFAULT_REGION") or 'eu-central-1'), - endpoint_url=(get_environment_setting("ELASTICMQ_URI") or 'http://presto-elasticmq:9324'), - aws_access_key_id=(get_environment_setting("AWS_ACCESS_KEY_ID") or 'x'), - aws_secret_access_key=(get_environment_setting("AWS_SECRET_ACCESS_KEY") or 'x')) + with self.sqs_lock: + return boto3.resource('sqs', + region_name=(get_environment_setting("AWS_DEFAULT_REGION") or 'eu-central-1'), + endpoint_url=(get_environment_setting("ELASTICMQ_URI") or 'http://presto-elasticmq:9324'), + aws_access_key_id=(get_environment_setting("AWS_ACCESS_KEY_ID") or 'x'), + aws_secret_access_key=(get_environment_setting("AWS_SECRET_ACCESS_KEY") or 'x')) else: logger.info(f"Using SQS Interface") - return boto3.resource('sqs', region_name=get_environment_setting("AWS_DEFAULT_REGION")) + with self.sqs_lock: + return boto3.resource('sqs', region_name=get_environment_setting("AWS_DEFAULT_REGION")) def group_deletions(self, messages_with_queues: List[Tuple[schemas.Message, boto3.resources.base.ServiceResource]]) -> Dict[boto3.resources.base.ServiceResource, List[schemas.Message]]: """