Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CV2-5589 resolve broken requests to delete messages #125

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions lib/queue/queue.py
Original file line number Diff line number Diff line change
@@ -143,18 +143,18 @@ def delete_messages(self, messages_with_queues: List[Tuple[schemas.Message, boto
"""
Delete messages in batch mode.
"""
for queue, messages in self.group_deletions(messages_with_queues).items():
for queue_name, messages in self.group_deletions(messages_with_queues).items():
logger.info(f"Deleting messages {messages}")
self.delete_messages_from_queue(queue, messages)
self.delete_messages_from_queue(queue_name, messages)

def delete_messages_from_queue(self, queue, messages: List[schemas.Message]):
def delete_messages_from_queue(self, queue_name, messages: List[schemas.Message]):
"""
Helper to delete a batch of messages from a specific queue.
"""
for i in range(0, len(messages), 10):
batch = messages[i:i + 10]
entries = [{"Id": str(idx), "ReceiptHandle": message.receipt_handle} for idx, message in enumerate(batch)]
queue.delete_messages(Entries=entries)
self.get_or_create_queue(queue_name)[0].delete_messages(Entries=entries)

def delete_message_entry(self, message: schemas.Message, idx: int = 0) -> Dict[str, str]:
"""
11 changes: 6 additions & 5 deletions test/lib/queue/test_queue.py
Original file line number Diff line number Diff line change
@@ -112,11 +112,11 @@ def test_receive_messages(self):
mock_queue1 = MagicMock()
mock_queue1.receive_messages.return_value = [
FakeSQSMessage(
receipt_handle="blah",
receipt_handle="blah",
body=json.dumps({"body": {"id": 1, "callback_url": "http://example.com", "text": "This is a test"}})
),
),
FakeSQSMessage(
receipt_handle="blah",
receipt_handle="blah",
body=json.dumps({"body": {"id": 2, "callback_url": "http://example.com", "text": "This is another test"}})
)
]
@@ -163,7 +163,8 @@ def test_delete_messages_from_queue(self, mock_logger):
FakeSQSMessage(receipt_handle="r1", body=json.dumps({"body": "msg1"})),
FakeSQSMessage(receipt_handle="r2", body=json.dumps({"body": "msg2"}))
]
self.queue.delete_messages_from_queue(self.mock_input_queue, mock_messages)
self.queue.get_or_create_queue = MagicMock(return_value=[self.mock_input_queue])
self.queue.delete_messages_from_queue(self.queue_name_input, mock_messages)
# Check if the correct number of calls to delete_messages were made
self.mock_input_queue.delete_messages.assert_called_once()

@@ -259,7 +260,7 @@ def test_error_capturing_in_get_response(self, mock_cache_set, mock_cache_get):
}
message = schemas.parse_input_message(message_data)
message.body.content_hash = "test_hash"

# Simulate an error in the process method
self.model.process = MagicMock(side_effect=Exception("Test error"))