Skip to content

Commit

Permalink
Merge pull request #39 from cuda-networks/handle_signals_better
Browse files Browse the repository at this point in the history
error handling and db management for signals
  • Loading branch information
alexeyts authored Aug 9, 2019
2 parents 6c45b64 + e5187a9 commit f395b48
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
26 changes: 19 additions & 7 deletions eb_sqs/worker/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,21 @@ def process_messages(self, queues, worker, static_queues):
messages = self.poll_messages(queue)
logger.debug('[django-eb-sqs] Polled {} messages'.format(len(messages)))

MESSAGES_RECEIVED.send(sender=self.__class__, messages=messages)
self._send_signal(MESSAGES_RECEIVED, messages=messages)

msg_entries = []
for msg in messages:
self.process_message(msg, worker)
self._execute_user_code(lambda: self._process_message(msg, worker))
msg_entries.append({
'Id': msg.message_id,
'ReceiptHandle': msg.receipt_handle
})

MESSAGES_PROCESSED.send(sender=self.__class__, messages=messages)
self._send_signal(MESSAGES_PROCESSED, messages=messages)

self.delete_messages(queue, msg_entries)

MESSAGES_DELETED.send(sender=self.__class__, messages=messages)
self._send_signal(MESSAGES_DELETED, messages=messages)
except ClientError as exc:
error_code = exc.response.get('Error', {}).get('Code', None)
if error_code == 'AWS.SimpleQueueService.NonExistentQueue' and queue not in static_queues:
Expand Down Expand Up @@ -124,18 +124,29 @@ def poll_messages(self, queue):
AttributeNames=[self._RECEIVE_COUNT_ATTRIBUTE]
)

def process_message(self, msg, worker):
def _send_signal(self, signal, messages):
# type: (django.dispatch.Signal, list) -> None
if signal.has_listeners(sender=self.__class__):
self._execute_user_code(lambda: signal.send(sender=self.__class__, messages=messages))

def _process_message(self, msg, worker):
# type: (Message, Worker) -> None
logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id))
try:
receive_count = int(msg.attributes[self._RECEIVE_COUNT_ATTRIBUTE])

with django_db_management():
worker.execute(msg.body, receive_count)
worker.execute(msg.body, receive_count)

logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id))
except ExecutionFailedException as exc:
logger.warning('[django-eb-sqs] Handling message {} got error: {}'.format(msg.message_id, repr(exc)))

@staticmethod
def _execute_user_code(function):
# type: (Any) -> None
try:
with django_db_management():
function()
except Exception as exc:
logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1)

Expand All @@ -153,5 +164,6 @@ def get_queues_by_prefixes(self, sqs, prefixes):
return queues

def write_healthcheck_file(self):
# type: () -> None
with open(settings.HEALTHCHECK_FILE_NAME, 'w') as file:
file.write(timezone.now().isoformat())
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name='django-eb-sqs',
version='1.32',
version='1.33',
package_dir={'eb_sqs': 'eb_sqs'},
include_package_data=True,
packages=find_packages(),
Expand Down

0 comments on commit f395b48

Please sign in to comment.