Skip to content

Commit

Permalink
Merge pull request #118 from meedan/cv2-5589-boto-locks
Browse files Browse the repository at this point in the history
CV2-5589 try locking mechanism
  • Loading branch information
DGaffney authored Nov 12, 2024
2 parents 5b3db09 + 0362187 commit ba72953
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions lib/queue/queue.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
from typing import List, Dict, Tuple
import os
from threading import Lock

import boto3
import botocore
Expand All @@ -17,6 +18,7 @@ def __init__(self):
"""
Start a specific queue - must pass input_queue_name.
"""
self.sqs_lock = Lock()
self.sqs = self.get_sqs()

@staticmethod
Expand Down Expand Up @@ -111,14 +113,16 @@ def get_sqs(self) -> boto3.resources.base.ServiceResource:
deploy_env = get_environment_setting("DEPLOY_ENV")
if deploy_env == "local":
logger.info(f"Using ElasticMQ Interface")
return boto3.resource('sqs',
region_name=(get_environment_setting("AWS_DEFAULT_REGION") or 'eu-central-1'),
endpoint_url=(get_environment_setting("ELASTICMQ_URI") or 'http://presto-elasticmq:9324'),
aws_access_key_id=(get_environment_setting("AWS_ACCESS_KEY_ID") or 'x'),
aws_secret_access_key=(get_environment_setting("AWS_SECRET_ACCESS_KEY") or 'x'))
with self.sqs_lock:
return boto3.resource('sqs',
region_name=(get_environment_setting("AWS_DEFAULT_REGION") or 'eu-central-1'),
endpoint_url=(get_environment_setting("ELASTICMQ_URI") or 'http://presto-elasticmq:9324'),
aws_access_key_id=(get_environment_setting("AWS_ACCESS_KEY_ID") or 'x'),
aws_secret_access_key=(get_environment_setting("AWS_SECRET_ACCESS_KEY") or 'x'))
else:
logger.info(f"Using SQS Interface")
return boto3.resource('sqs', region_name=get_environment_setting("AWS_DEFAULT_REGION"))
with self.sqs_lock:
return boto3.resource('sqs', region_name=get_environment_setting("AWS_DEFAULT_REGION"))

def group_deletions(self, messages_with_queues: List[Tuple[schemas.Message, boto3.resources.base.ServiceResource]]) -> Dict[boto3.resources.base.ServiceResource, List[schemas.Message]]:
"""
Expand Down

0 comments on commit ba72953

Please sign in to comment.