Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor to get rid of persistent queues and change to singular lookup #120

Merged
merged 3 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions lib/queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ def __init__(self):
Start a specific queue - must pass input_queue_name.
"""
self.sqs_lock = Lock()
self.sqs = self.get_sqs()

@staticmethod
def get_queue_prefix():
Expand Down Expand Up @@ -82,7 +81,7 @@ def create_queue(self, queue_name: str) -> boto3.resources.base.ServiceResource:
# Optionally enable content-based deduplication for FIFO queues
attributes['ContentBasedDeduplication'] = 'true'
# Include other FIFO-specific attributes as needed
return self.sqs.create_queue(
return self.get_sqs().create_queue(
QueueName=queue_name,
Attributes=attributes
)
Expand All @@ -92,14 +91,7 @@ def get_or_create_queues(self, queue_name: str) -> List[boto3.resources.base.Ser
Initialize all queues for the given worker - try to create them if they are not found by name for whatever reason
"""
try:
found_queues = [q for q in self.sqs.queues.filter(QueueNamePrefix=queue_name)]
exact_match_queues = [queue for queue in found_queues if queue.attributes['QueueArn'].split(':')[-1] == queue_name]
logger.info(f"found queues are {found_queues}")
logger.info(f"exact queues are {exact_match_queues}")
if exact_match_queues:
return exact_match_queues
else:
return [self.create_queue(queue_name)]
return [self.get_sqs().get_queue_by_name(QueueName=queue_name)]
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "AWS.SimpleQueueService.NonExistentQueue":
return [self.create_queue(queue_name)]
Expand Down
2 changes: 1 addition & 1 deletion test/lib/queue/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def setUp(self, mock_get_env_setting, mock_boto_resource):
self.mock_sqs_resource = MagicMock()
self.mock_input_queue = MagicMock()
self.mock_input_queue.url = "http://queue/mean_tokens__Model"
self.mock_sqs_resource.queues.filter.return_value = [self.mock_input_queue]
self.mock_sqs_resource.get_queue_by_name.return_value = self.mock_input_queue
mock_boto_resource.return_value = self.mock_sqs_resource

# Initialize the QueueProcessor instance
Expand Down
6 changes: 5 additions & 1 deletion test/lib/queue/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ def setUp(self, mock_log_execution_time, mock_get_env_setting, mock_boto_resourc
self.mock_dlq_queue = MagicMock()
self.mock_dlq_queue.url = f"http://queue/{self.queue_name_dlq}"
self.mock_dlq_queue.attributes = {"QueueArn": f"queue:{self.queue_name_dlq}"}
self.mock_sqs_resource.queues.filter.return_value = [self.mock_input_queue, self.mock_output_queue, self.mock_dlq_queue]
self.mock_sqs_resource.get_queue_by_name.side_effect = lambda QueueName: {
self.queue_name_input: self.mock_input_queue,
self.queue_name_output: self.mock_output_queue,
self.queue_name_dlq: self.mock_dlq_queue
}.get(QueueName)
mock_boto_resource.return_value = self.mock_sqs_resource

# Initialize the QueueWorker instance
Expand Down
Loading