Skip to content

Commit

Permalink
Merge pull request #120 from meedan/cv2-5589-stateless-singular-queue
Browse files Browse the repository at this point in the history
refactor to get rid of persistent queues and change to singular lookup
  • Loading branch information
DGaffney authored Nov 14, 2024
2 parents f08180b + ab01501 commit 6bda299
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 12 deletions.
12 changes: 2 additions & 10 deletions lib/queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ def __init__(self):
Start a specific queue - must pass input_queue_name.
"""
self.sqs_lock = Lock()
self.sqs = self.get_sqs()

@staticmethod
def get_queue_prefix():
Expand Down Expand Up @@ -82,7 +81,7 @@ def create_queue(self, queue_name: str) -> boto3.resources.base.ServiceResource:
# Optionally enable content-based deduplication for FIFO queues
attributes['ContentBasedDeduplication'] = 'true'
# Include other FIFO-specific attributes as needed
return self.sqs.create_queue(
return self.get_sqs().create_queue(
QueueName=queue_name,
Attributes=attributes
)
Expand All @@ -92,14 +91,7 @@ def get_or_create_queues(self, queue_name: str) -> List[boto3.resources.base.Ser
Initialize all queues for the given worker - try to create them if they are not found by name for whatever reason
"""
try:
found_queues = [q for q in self.sqs.queues.filter(QueueNamePrefix=queue_name)]
exact_match_queues = [queue for queue in found_queues if queue.attributes['QueueArn'].split(':')[-1] == queue_name]
logger.info(f"found queues are {found_queues}")
logger.info(f"exact queues are {exact_match_queues}")
if exact_match_queues:
return exact_match_queues
else:
return [self.create_queue(queue_name)]
return [self.get_sqs().get_queue_by_name(QueueName=queue_name)]
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "AWS.SimpleQueueService.NonExistentQueue":
return [self.create_queue(queue_name)]
Expand Down
2 changes: 1 addition & 1 deletion test/lib/queue/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def setUp(self, mock_get_env_setting, mock_boto_resource):
self.mock_sqs_resource = MagicMock()
self.mock_input_queue = MagicMock()
self.mock_input_queue.url = "http://queue/mean_tokens__Model"
self.mock_sqs_resource.queues.filter.return_value = [self.mock_input_queue]
self.mock_sqs_resource.get_queue_by_name.return_value = self.mock_input_queue
mock_boto_resource.return_value = self.mock_sqs_resource

# Initialize the QueueProcessor instance
Expand Down
6 changes: 5 additions & 1 deletion test/lib/queue/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ def setUp(self, mock_log_execution_time, mock_get_env_setting, mock_boto_resourc
self.mock_dlq_queue = MagicMock()
self.mock_dlq_queue.url = f"http://queue/{self.queue_name_dlq}"
self.mock_dlq_queue.attributes = {"QueueArn": f"queue:{self.queue_name_dlq}"}
self.mock_sqs_resource.queues.filter.return_value = [self.mock_input_queue, self.mock_output_queue, self.mock_dlq_queue]
self.mock_sqs_resource.get_queue_by_name.side_effect = lambda QueueName: {
self.queue_name_input: self.mock_input_queue,
self.queue_name_output: self.mock_output_queue,
self.queue_name_dlq: self.mock_dlq_queue
}.get(QueueName)
mock_boto_resource.return_value = self.mock_sqs_resource

# Initialize the QueueWorker instance
Expand Down

0 comments on commit 6bda299

Please sign in to comment.