diff --git a/eb_sqs/worker/service.py b/eb_sqs/worker/service.py index 87a45c9..6cd3091 100644 --- a/eb_sqs/worker/service.py +++ b/eb_sqs/worker/service.py @@ -16,7 +16,9 @@ class WorkerService(object): - PREFIX_STR = 'prefix:' + _PREFIX_STR = 'prefix:' + + _RECEIVE_COUNT_ATTRIBUTE = 'ApproximateReceiveCount' def process_queues(self, queue_names): # type: (list) -> None @@ -28,10 +30,10 @@ def process_queues(self, queue_names): config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES}) ) - prefixes = list(filter(lambda qn: qn.startswith(self.PREFIX_STR), queue_names)) + prefixes = list(filter(lambda qn: qn.startswith(self._PREFIX_STR), queue_names)) queues = self.get_queues_by_names(sqs, list(set(queue_names) - set(prefixes))) - queue_prefixes = [prefix.split(self.PREFIX_STR)[1] for prefix in prefixes] + queue_prefixes = [prefix.split(self._PREFIX_STR)[1] for prefix in prefixes] static_queues = queues last_update_time = timezone.make_aware(datetime.min) @@ -95,12 +97,19 @@ def poll_messages(self, queue): return queue.receive_messages( MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES, WaitTimeSeconds=settings.WAIT_TIME_S, + AttributeNames=[self._RECEIVE_COUNT_ATTRIBUTE] ) def process_message(self, msg, worker): # type: (Message, Worker) -> None logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id)) try: + receive_count = int(msg.attributes[self._RECEIVE_COUNT_ATTRIBUTE]) + if receive_count > 1: + logger.warning('[django-eb-sqs] SQS re-queued message {} times: Msg Id: {} Body: {}'.format( + receive_count, msg.message_id, msg.body + )) + worker.execute(msg.body) logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id)) except Exception as exc: