From eeec51c28c1cbca334b3c4e8d560f36cf6057811 Mon Sep 17 00:00:00 2001 From: Devin Gaffney Date: Tue, 12 Nov 2024 10:15:48 -0800 Subject: [PATCH] CV2-5589 try locking mechanism --- lib/queue/queue.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/lib/queue/queue.py b/lib/queue/queue.py index f54a286f..b3a5d399 100644 --- a/lib/queue/queue.py +++ b/lib/queue/queue.py @@ -1,6 +1,7 @@ import json from typing import List, Dict, Tuple import os +from threading import Lock import boto3 import botocore @@ -18,6 +19,7 @@ def __init__(self): Start a specific queue - must pass input_queue_name. """ self.sqs = self.get_sqs() + self.sqs_lock = Lock() @staticmethod def get_queue_prefix(): @@ -111,14 +113,16 @@ def get_sqs(self) -> boto3.resources.base.ServiceResource: deploy_env = get_environment_setting("DEPLOY_ENV") if deploy_env == "local": logger.info(f"Using ElasticMQ Interface") - return boto3.resource('sqs', - region_name=(get_environment_setting("AWS_DEFAULT_REGION") or 'eu-central-1'), - endpoint_url=(get_environment_setting("ELASTICMQ_URI") or 'http://presto-elasticmq:9324'), - aws_access_key_id=(get_environment_setting("AWS_ACCESS_KEY_ID") or 'x'), - aws_secret_access_key=(get_environment_setting("AWS_SECRET_ACCESS_KEY") or 'x')) + with self.sqs_lock: + return boto3.resource('sqs', + region_name=(get_environment_setting("AWS_DEFAULT_REGION") or 'eu-central-1'), + endpoint_url=(get_environment_setting("ELASTICMQ_URI") or 'http://presto-elasticmq:9324'), + aws_access_key_id=(get_environment_setting("AWS_ACCESS_KEY_ID") or 'x'), + aws_secret_access_key=(get_environment_setting("AWS_SECRET_ACCESS_KEY") or 'x')) else: logger.info(f"Using SQS Interface") - return boto3.resource('sqs', region_name=get_environment_setting("AWS_DEFAULT_REGION")) + with self.sqs_lock: + return boto3.resource('sqs', region_name=get_environment_setting("AWS_DEFAULT_REGION")) def group_deletions(self, messages_with_queues: List[Tuple[schemas.Message, boto3.resources.base.ServiceResource]]) -> Dict[boto3.resources.base.ServiceResource, List[schemas.Message]]: """