Skip to content

Commit

Permalink
CV2-5589 resolve broken requests to delete messages
Browse files Browse the repository at this point in the history
  • Loading branch information
DGaffney committed Nov 15, 2024
1 parent 86a56f8 commit 4c81ea3
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
8 changes: 4 additions & 4 deletions lib/queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,18 @@ def delete_messages(self, messages_with_queues: List[Tuple[schemas.Message, boto
"""
Delete messages in batch mode.
"""
for queue, messages in self.group_deletions(messages_with_queues).items():
for queue_name, messages in self.group_deletions(messages_with_queues).items():
logger.info(f"Deleting messages {messages}")
self.delete_messages_from_queue(queue, messages)
self.delete_messages_from_queue(queue_name, messages)

def delete_messages_from_queue(self, queue, messages: List[schemas.Message]):
def delete_messages_from_queue(self, queue_name, messages: List[schemas.Message]):
"""
Helper to delete a batch of messages from a specific queue.
"""
for i in range(0, len(messages), 10):
batch = messages[i:i + 10]
entries = [{"Id": str(idx), "ReceiptHandle": message.receipt_handle} for idx, message in enumerate(batch)]
queue.delete_messages(Entries=entries)
self.get_or_create_queue(queue_name)[0].delete_messages(Entries=entries)

def delete_message_entry(self, message: schemas.Message, idx: int = 0) -> Dict[str, str]:
"""
Expand Down
11 changes: 6 additions & 5 deletions test/lib/queue/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ def test_receive_messages(self):
mock_queue1 = MagicMock()
mock_queue1.receive_messages.return_value = [
FakeSQSMessage(
receipt_handle="blah",
receipt_handle="blah",
body=json.dumps({"body": {"id": 1, "callback_url": "http://example.com", "text": "This is a test"}})
),
),
FakeSQSMessage(
receipt_handle="blah",
receipt_handle="blah",
body=json.dumps({"body": {"id": 2, "callback_url": "http://example.com", "text": "This is another test"}})
)
]
Expand Down Expand Up @@ -163,7 +163,8 @@ def test_delete_messages_from_queue(self, mock_logger):
FakeSQSMessage(receipt_handle="r1", body=json.dumps({"body": "msg1"})),
FakeSQSMessage(receipt_handle="r2", body=json.dumps({"body": "msg2"}))
]
self.queue.delete_messages_from_queue(self.mock_input_queue, mock_messages)
self.queue.get_or_create_queue = MagicMock(return_value=[self.mock_input_queue])
self.queue.delete_messages_from_queue(self.queue_name_input, mock_messages)
# Check if the correct number of calls to delete_messages were made
self.mock_input_queue.delete_messages.assert_called_once()

Expand Down Expand Up @@ -259,7 +260,7 @@ def test_error_capturing_in_get_response(self, mock_cache_set, mock_cache_get):
}
message = schemas.parse_input_message(message_data)
message.body.content_hash = "test_hash"

# Simulate an error in the process method
self.model.process = MagicMock(side_effect=Exception("Test error"))

Expand Down

0 comments on commit 4c81ea3

Please sign in to comment.