From 87132fe2b96af3876d53e85960b59679448ab3f8 Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Thu, 8 Aug 2019 13:06:48 -0700 Subject: [PATCH 1/3] error handling and db management for signals --- eb_sqs/worker/service.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/eb_sqs/worker/service.py b/eb_sqs/worker/service.py index ec41c08..97db7e8 100644 --- a/eb_sqs/worker/service.py +++ b/eb_sqs/worker/service.py @@ -81,21 +81,21 @@ def process_messages(self, queues, worker, static_queues): messages = self.poll_messages(queue) logger.debug('[django-eb-sqs] Polled {} messages'.format(len(messages))) - MESSAGES_RECEIVED.send(sender=self.__class__, messages=messages) + self._execute_user_code(lambda: MESSAGES_RECEIVED.send(sender=self.__class__, messages=messages)) msg_entries = [] for msg in messages: - self.process_message(msg, worker) + self._execute_user_code(lambda: self._process_message(msg, worker)) msg_entries.append({ 'Id': msg.message_id, 'ReceiptHandle': msg.receipt_handle }) - MESSAGES_PROCESSED.send(sender=self.__class__, messages=messages) + self._execute_user_code(lambda: MESSAGES_PROCESSED.send(sender=self.__class__, messages=messages)) self.delete_messages(queue, msg_entries) - MESSAGES_DELETED.send(sender=self.__class__, messages=messages) + self._execute_user_code(lambda: MESSAGES_DELETED.send(sender=self.__class__, messages=messages)) except ClientError as exc: error_code = exc.response.get('Error', {}).get('Code', None) if error_code == 'AWS.SimpleQueueService.NonExistentQueue' and queue not in static_queues: @@ -124,18 +124,24 @@ def poll_messages(self, queue): AttributeNames=[self._RECEIVE_COUNT_ATTRIBUTE] ) - def process_message(self, msg, worker): + def _process_message(self, msg, worker): # type: (Message, Worker) -> None logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id)) try: receive_count = int(msg.attributes[self._RECEIVE_COUNT_ATTRIBUTE]) - with django_db_management(): - worker.execute(msg.body, receive_count) + worker.execute(msg.body, receive_count) logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id)) except ExecutionFailedException as exc: logger.warning('[django-eb-sqs] Handling message {} got error: {}'.format(msg.message_id, repr(exc))) + + @staticmethod + def _execute_user_code(function): + # type: (Any) -> None + try: + with django_db_management(): + function() except Exception as exc: logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1) From 2bc07e710ac031326f02c97465da72d1001780d3 Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Thu, 8 Aug 2019 13:25:28 -0700 Subject: [PATCH 2/3] only execute signal if has receivers --- eb_sqs/worker/service.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/eb_sqs/worker/service.py b/eb_sqs/worker/service.py index 97db7e8..527aec2 100644 --- a/eb_sqs/worker/service.py +++ b/eb_sqs/worker/service.py @@ -81,7 +81,7 @@ def process_messages(self, queues, worker, static_queues): messages = self.poll_messages(queue) logger.debug('[django-eb-sqs] Polled {} messages'.format(len(messages))) - self._execute_user_code(lambda: MESSAGES_RECEIVED.send(sender=self.__class__, messages=messages)) + self._send_signal(MESSAGES_RECEIVED, messages=messages) msg_entries = [] for msg in messages: @@ -91,11 +91,11 @@ def process_messages(self, queues, worker, static_queues): 'ReceiptHandle': msg.receipt_handle }) - self._execute_user_code(lambda: MESSAGES_PROCESSED.send(sender=self.__class__, messages=messages)) + self._send_signal(MESSAGES_PROCESSED, messages=messages) self.delete_messages(queue, msg_entries) - self._execute_user_code(lambda: MESSAGES_DELETED.send(sender=self.__class__, messages=messages)) + self._send_signal(MESSAGES_DELETED, messages=messages) except ClientError as exc: error_code = exc.response.get('Error', {}).get('Code', None) if error_code == 'AWS.SimpleQueueService.NonExistentQueue' and queue not in static_queues: @@ -124,6 +124,11 @@ def poll_messages(self, queue): AttributeNames=[self._RECEIVE_COUNT_ATTRIBUTE] ) + def _send_signal(self, signal, messages): + # type: (django.dispatch.Signal, list) -> None + if signal.has_listeners(sender=self.__class__): + self._execute_user_code(lambda: signal.send(sender=self.__class__, messages=messages)) + def _process_message(self, msg, worker): # type: (Message, Worker) -> None logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id)) From e5187a9a7a883af0ffa23b33109198674eacb006 Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Thu, 8 Aug 2019 16:26:43 -0700 Subject: [PATCH 3/3] bump version --- eb_sqs/worker/service.py | 1 + setup.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/eb_sqs/worker/service.py b/eb_sqs/worker/service.py index 527aec2..ddaca9c 100644 --- a/eb_sqs/worker/service.py +++ b/eb_sqs/worker/service.py @@ -164,5 +164,6 @@ def get_queues_by_prefixes(self, sqs, prefixes): return queues def write_healthcheck_file(self): + # type: () -> None with open(settings.HEALTHCHECK_FILE_NAME, 'w') as file: file.write(timezone.now().isoformat()) diff --git a/setup.py b/setup.py index 1ff9bfc..5c4c8a1 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup( name='django-eb-sqs', - version='1.32', + version='1.33', package_dir={'eb_sqs': 'eb_sqs'}, include_package_data=True, packages=find_packages(),