Skip to content

Commit

Permalink
Merge pull request #20 from cuda-networks/sqs_attributes
Browse files Browse the repository at this point in the history
logging abnormal receive count from sqs
  • Loading branch information
alexeyts authored Feb 28, 2018
2 parents 90192af + f2e4e97 commit e26ad1f
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions eb_sqs/worker/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@


class WorkerService(object):
PREFIX_STR = 'prefix:'
_PREFIX_STR = 'prefix:'

_RECEIVE_COUNT_ATTRIBUTE = 'ApproximateReceiveCount'

def process_queues(self, queue_names):
# type: (list) -> None
Expand All @@ -28,10 +30,10 @@ def process_queues(self, queue_names):
config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES})
)

prefixes = list(filter(lambda qn: qn.startswith(self.PREFIX_STR), queue_names))
prefixes = list(filter(lambda qn: qn.startswith(self._PREFIX_STR), queue_names))
queues = self.get_queues_by_names(sqs, list(set(queue_names) - set(prefixes)))

queue_prefixes = [prefix.split(self.PREFIX_STR)[1] for prefix in prefixes]
queue_prefixes = [prefix.split(self._PREFIX_STR)[1] for prefix in prefixes]
static_queues = queues
last_update_time = timezone.make_aware(datetime.min)

Expand Down Expand Up @@ -95,12 +97,19 @@ def poll_messages(self, queue):
return queue.receive_messages(
MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES,
WaitTimeSeconds=settings.WAIT_TIME_S,
AttributeNames=[self._RECEIVE_COUNT_ATTRIBUTE]
)

def process_message(self, msg, worker):
# type: (Message, Worker) -> None
logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id))
try:
receive_count = int(msg.attributes[self._RECEIVE_COUNT_ATTRIBUTE])
if receive_count > 1:
logger.warning('[django-eb-sqs] SQS re-queued message {} times: Msg Id: {} Body: {}'.format(
receive_count, msg.message_id, msg.body
))

worker.execute(msg.body)
logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id))
except Exception as exc:
Expand Down

0 comments on commit e26ad1f

Please sign in to comment.