From 9ae81b1fa32a4a4e0adcf83d0326b8de33e92e16 Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Tue, 27 Feb 2018 16:01:44 -0800 Subject: [PATCH 1/5] testing --- eb_sqs/worker/service.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/eb_sqs/worker/service.py b/eb_sqs/worker/service.py index 87a45c9..3dd88db 100644 --- a/eb_sqs/worker/service.py +++ b/eb_sqs/worker/service.py @@ -28,6 +28,12 @@ def process_queues(self, queue_names): config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES}) ) + sqs_client = boto3.client( + 'sqs', + region_name=settings.AWS_REGION, + config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES}) + ) + prefixes = list(filter(lambda qn: qn.startswith(self.PREFIX_STR), queue_names)) queues = self.get_queues_by_names(sqs, list(set(queue_names) - set(prefixes))) @@ -101,8 +107,11 @@ def process_message(self, msg, worker): # type: (Message, Worker) -> None logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id)) try: - worker.execute(msg.body) + # worker.execute(msg.body) logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id)) + + msg.load() + print(msg.attributes) except Exception as exc: logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1) From a698467cbb532e439eadee7c36f6e4a9bd19492d Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Tue, 27 Feb 2018 16:15:43 -0800 Subject: [PATCH 2/5] testing --- eb_sqs/worker/service.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/eb_sqs/worker/service.py b/eb_sqs/worker/service.py index 3dd88db..be889ea 100644 --- a/eb_sqs/worker/service.py +++ b/eb_sqs/worker/service.py @@ -5,6 +5,7 @@ import boto3 import logging +from boto.sqs.message import Message from botocore.config import Config from django.utils import timezone @@ -101,6 +102,7 @@ def poll_messages(self, queue): return queue.receive_messages( MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES, WaitTimeSeconds=settings.WAIT_TIME_S, + AttributeNames=['All'] ) def process_message(self, msg, worker): @@ -110,7 +112,6 @@ def process_message(self, msg, worker): # worker.execute(msg.body) logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id)) - msg.load() print(msg.attributes) except Exception as exc: logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1) From ddef01910a62ff337c05da044aad4aa03193ffa0 Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Tue, 27 Feb 2018 16:53:19 -0800 Subject: [PATCH 3/5] testing --- eb_sqs/worker/service.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/eb_sqs/worker/service.py b/eb_sqs/worker/service.py index be889ea..3e18b13 100644 --- a/eb_sqs/worker/service.py +++ b/eb_sqs/worker/service.py @@ -5,7 +5,6 @@ import boto3 import logging -from boto.sqs.message import Message from botocore.config import Config from django.utils import timezone @@ -17,7 +16,9 @@ class WorkerService(object): - PREFIX_STR = 'prefix:' + _PREFIX_STR = 'prefix:' + + _RECEIVE_COUNT_ATTRIBUTE = 'ApproximateReceiveCount' def process_queues(self, queue_names): # type: (list) -> None @@ -29,16 +30,10 @@ def process_queues(self, queue_names): config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES}) ) - sqs_client = boto3.client( - 'sqs', - region_name=settings.AWS_REGION, - config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES}) - ) - - prefixes = list(filter(lambda qn: qn.startswith(self.PREFIX_STR), queue_names)) + prefixes = list(filter(lambda qn: qn.startswith(self._PREFIX_STR), queue_names)) queues = self.get_queues_by_names(sqs, list(set(queue_names) - set(prefixes))) - queue_prefixes = [prefix.split(self.PREFIX_STR)[1] for prefix in prefixes] + queue_prefixes = [prefix.split(self._PREFIX_STR)[1] for prefix in prefixes] static_queues = queues last_update_time = timezone.make_aware(datetime.min) @@ -102,7 +97,7 @@ def poll_messages(self, queue): return queue.receive_messages( MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES, WaitTimeSeconds=settings.WAIT_TIME_S, - AttributeNames=['All'] + AttributeNames=[self._RECEIVE_COUNT_ATTRIBUTE] ) def process_message(self, msg, worker): @@ -110,6 +105,8 @@ def process_message(self, msg, worker): logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id)) try: # worker.execute(msg.body) + from time import sleep + sleep(10) logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id)) print(msg.attributes) From 4db9e1912c683cd93e78afb6f2b575258224e56b Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Tue, 27 Feb 2018 17:10:13 -0800 Subject: [PATCH 4/5] adding receive count logging and cleanup --- eb_sqs/worker/service.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/eb_sqs/worker/service.py b/eb_sqs/worker/service.py index 3e18b13..38eaa97 100644 --- a/eb_sqs/worker/service.py +++ b/eb_sqs/worker/service.py @@ -104,12 +104,14 @@ def process_message(self, msg, worker): # type: (Message, Worker) -> None logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id)) try: - # worker.execute(msg.body) - from time import sleep - sleep(10) + worker.execute(msg.body) logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id)) - print(msg.attributes) + receive_count = int(msg.attributes[self._RECEIVE_COUNT_ATTRIBUTE]) + if receive_count > 1: + logger.warning('[django-eb-sqs] SQS re-queued message {} times: Msg Id: {} Body: {}'.format( + receive_count, msg.message_id, msg.body + )) except Exception as exc: logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1) From f2e4e9755ca85ba3ed91ca80ad786ae34f4f83fb Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Tue, 27 Feb 2018 17:12:17 -0800 Subject: [PATCH 5/5] change log order --- eb_sqs/worker/service.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/eb_sqs/worker/service.py b/eb_sqs/worker/service.py index 38eaa97..6cd3091 100644 --- a/eb_sqs/worker/service.py +++ b/eb_sqs/worker/service.py @@ -104,14 +104,14 @@ def process_message(self, msg, worker): # type: (Message, Worker) -> None logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id)) try: - worker.execute(msg.body) - logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id)) - receive_count = int(msg.attributes[self._RECEIVE_COUNT_ATTRIBUTE]) if receive_count > 1: logger.warning('[django-eb-sqs] SQS re-queued message {} times: Msg Id: {} Body: {}'.format( receive_count, msg.message_id, msg.body )) + + worker.execute(msg.body) + logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id)) except Exception as exc: logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1)