From a011acf8671d36b0041acf250ed20caf6517695d Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Mon, 29 Jan 2018 13:22:45 -0800 Subject: [PATCH 1/6] support monitoring queues by prefix --- development.txt | 2 +- eb_sqs/management/commands/process_queue.py | 56 +++++++++++++++++---- eb_sqs/settings.py | 2 + setup.py | 4 +- 4 files changed, 51 insertions(+), 13 deletions(-) diff --git a/development.txt b/development.txt index 8263adb..1ad5f65 100644 --- a/development.txt +++ b/development.txt @@ -1,4 +1,4 @@ -boto3==1.4.7 +boto3==1.4.8 Django==1.10.6 mock==2.0.0 moto==0.4.24 diff --git a/eb_sqs/management/commands/process_queue.py b/eb_sqs/management/commands/process_queue.py index dddb016..b561d11 100644 --- a/eb_sqs/management/commands/process_queue.py +++ b/eb_sqs/management/commands/process_queue.py @@ -1,10 +1,13 @@ from __future__ import absolute_import, unicode_literals +from datetime import timedelta, datetime + import boto3 import logging from botocore.config import Config from django.core.management import BaseCommand, CommandError +from django.utils import timezone from eb_sqs import settings from eb_sqs.worker.worker import Worker @@ -16,6 +19,8 @@ class Command(BaseCommand): help = 'Command to process tasks from one or more SQS queues' + PREFIX_STR = 'prefix:' + def add_arguments(self, parser): parser.add_argument('--queues', '-q', dest='queue_names', @@ -34,23 +39,41 @@ def handle(self, *args, **options): region_name=settings.AWS_REGION, config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES}) ) - queues = [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names] + + prefixes = list(filter(lambda qn: qn.startsWith(self.PREFIX_STR), queue_names)) + queues = self._get_queues_by_names(sqs, list(set(queue_names) - set(prefixes))) + + queue_prefixes = [prefix.split(self.PREFIX_STR)[1] for prefix in prefixes] + static_queues = queues + last_update_time = timezone.make_aware(datetime.min) logger.debug('[django-eb-sqs] Connected to SQS: {}'.format(', '.join(queue_names))) worker = WorkerFactory.default().create() while True: - for queue in queues: - messages = queue.receive_messages( - MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES, - WaitTimeSeconds=settings.WAIT_TIME_S, - ) - - for msg in messages: - self._process_message(msg, worker) + if len(queue_prefixes) > 0 and \ + timezone.now() - timedelta(seconds=settings.REFRESH_PREFIX_QUEUES_S) > last_update_time: + queues = static_queues + self._get_queues_by_prefixes(sqs, queue_prefixes) + last_update_time = timezone.now() + logger.debug('[django-eb-sqs] Updated SQS queues: {}'.format( + ', '.join([queue.url for queue in queues]) + )) - def _process_message(self, msg, worker): + for queue in queues: + try: + messages = queue.receive_messages( + MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES, + WaitTimeSeconds=settings.WAIT_TIME_S, + ) + + for msg in messages: + self._process_message(msg, worker) + except Exception as exc: + logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=1) + + @staticmethod + def _process_message(msg, worker): # type: (Any, Worker) -> None logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id)) try: @@ -61,3 +84,16 @@ def _process_message(self, msg, worker): finally: msg.delete() logger.debug('[django-eb-sqs] Deleted message {}'.format(msg.message_id)) + + @staticmethod + def _get_queues_by_names(sqs, queue_names): + return [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names] + + @staticmethod + def _get_queues_by_prefixes(sqs, prefixes): + queues = [] + + for prefix in prefixes: + queues += sqs.queues.filter(QueueNamePrefix=prefix) + + return queues diff --git a/eb_sqs/settings.py b/eb_sqs/settings.py index d7ce284..485a97d 100644 --- a/eb_sqs/settings.py +++ b/eb_sqs/settings.py @@ -32,3 +32,5 @@ DEAD_LETTER_MODE = getattr(settings, 'EB_SQS_DEAD_LETTER_MODE', False) # type: bool AWS_MAX_RETRIES = getattr(settings, 'EB_SQS_AWS_MAX_RETRIES', 30) # type: int + +REFRESH_PREFIX_QUEUES_S = getattr(settings, 'EB_SQS_REFRESH_PREFIX_QUEUES_S', 10) # type: int diff --git a/setup.py b/setup.py index ea3d7f8..26001bf 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup( name='django-eb-sqs', - version='0.99', + version='1.0', package_dir={'eb_sqs': 'eb_sqs'}, include_package_data=True, packages=find_packages(), @@ -14,7 +14,7 @@ long_description=README, url='https://github.com/cuda-networks/django-eb-sqs', install_requires=[ - 'boto3>=1.4.7', + 'boto3>=1.4.8', 'Django>=1.7', 'redis>=2.10', 'requests>=2', From aeedaa8d79e516a5e6b0496a0a947a6c3497d014 Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Mon, 29 Jan 2018 13:29:19 -0800 Subject: [PATCH 2/6] bug fix --- eb_sqs/management/commands/process_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eb_sqs/management/commands/process_queue.py b/eb_sqs/management/commands/process_queue.py index b561d11..406d770 100644 --- a/eb_sqs/management/commands/process_queue.py +++ b/eb_sqs/management/commands/process_queue.py @@ -40,7 +40,7 @@ def handle(self, *args, **options): config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES}) ) - prefixes = list(filter(lambda qn: qn.startsWith(self.PREFIX_STR), queue_names)) + prefixes = list(filter(lambda qn: qn.startswith(self.PREFIX_STR), queue_names)) queues = self._get_queues_by_names(sqs, list(set(queue_names) - set(prefixes))) queue_prefixes = [prefix.split(self.PREFIX_STR)[1] for prefix in prefixes] From b90202a6a85b757ceeb622f3af9d731cf0e71435 Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Mon, 29 Jan 2018 13:56:12 -0800 Subject: [PATCH 3/6] updating readme --- README.md | 8 ++++++++ eb_sqs/management/commands/process_queue.py | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 0624695..e9b263f 100644 --- a/README.md +++ b/README.md @@ -105,6 +105,13 @@ python manage.py process_queue --queues This is a good idea for someone who wants to execute tasks without an Elastic Beanstalk worker. +You can either use full queue names, or queue prefix using `prefix:*my_example_prefix*` notation. + +Examples: +```bash +python manage.py process_queue --queues queue1,queue2 # process queue1 and queue2 +python manage.py process_queue --queues queue1,prefix:pr1-,queue2 # process queue1, queue2 and any queue whose name starts with 'pr1-' +``` #### Group Tasks Multiple tasks can be grouped by specifing the `group_id` argument when calling `delay` on a task. @@ -148,6 +155,7 @@ The following settings can be used to fine tune django-eb-sqs. Copy them into yo - EB_SQS_REDIS_KEY_PREFIX (`eb-sqs-`): Prefix used for all Redis keys - EB_SQS_USE_PICKLE (`False`): Enable to use `pickle` to serialize task parameters. Uses `json` as default. - EB_SQS_AWS_MAX_RETRIES (`30`): Default retry limit on a boto3 call to AWS SQS. +- EB_SQS_REFRESH_PREFIX_QUEUES_S (`10`): Minimal number of seconds to wait between refreshing queue list, in case prefix is used ### Development diff --git a/eb_sqs/management/commands/process_queue.py b/eb_sqs/management/commands/process_queue.py index 406d770..e86ac1e 100644 --- a/eb_sqs/management/commands/process_queue.py +++ b/eb_sqs/management/commands/process_queue.py @@ -56,7 +56,7 @@ def handle(self, *args, **options): timezone.now() - timedelta(seconds=settings.REFRESH_PREFIX_QUEUES_S) > last_update_time: queues = static_queues + self._get_queues_by_prefixes(sqs, queue_prefixes) last_update_time = timezone.now() - logger.debug('[django-eb-sqs] Updated SQS queues: {}'.format( + logger.info('[django-eb-sqs] Updated SQS queues: {}'.format( ', '.join([queue.url for queue in queues]) )) From 57db87bf030f5350427356c3ba6766c333018db0 Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Tue, 30 Jan 2018 09:56:00 -0800 Subject: [PATCH 4/6] updating created queue params --- eb_sqs/aws/sqs_queue_client.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/eb_sqs/aws/sqs_queue_client.py b/eb_sqs/aws/sqs_queue_client.py index 7cd7c94..954e936 100644 --- a/eb_sqs/aws/sqs_queue_client.py +++ b/eb_sqs/aws/sqs_queue_client.py @@ -42,7 +42,13 @@ def _get_sqs_queue(self, queue_name): def _add_sqs_queue(self, queue_name): # type: (unicode) -> Any if settings.AUTO_ADD_QUEUE: - queue = self.sqs.create_queue(QueueName=queue_name) + queue = self.sqs.create_queue( + QueueName=queue_name, + Attributes={ + 'MessageRetentionPeriod': 1209600, + 'VisibilityTimeout': 300 + } + ) self.queue_cache[queue_name] = queue return queue else: From 819bf63c511805423de0a1c2a1c6f7f157a6aa8b Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Tue, 30 Jan 2018 09:58:42 -0800 Subject: [PATCH 5/6] bug fix --- eb_sqs/aws/sqs_queue_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eb_sqs/aws/sqs_queue_client.py b/eb_sqs/aws/sqs_queue_client.py index 954e936..96d504e 100644 --- a/eb_sqs/aws/sqs_queue_client.py +++ b/eb_sqs/aws/sqs_queue_client.py @@ -45,8 +45,8 @@ def _add_sqs_queue(self, queue_name): queue = self.sqs.create_queue( QueueName=queue_name, Attributes={ - 'MessageRetentionPeriod': 1209600, - 'VisibilityTimeout': 300 + 'MessageRetentionPeriod': '1209600', + 'VisibilityTimeout': '300' } ) self.queue_cache[queue_name] = queue From 63412201d95dbec6acfa433d979d5330db5f54d0 Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Tue, 30 Jan 2018 16:40:24 -0800 Subject: [PATCH 6/6] fixed pr comment --- README.md | 12 +++++++----- eb_sqs/aws/sqs_queue_client.py | 4 ++-- eb_sqs/settings.py | 3 +++ 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index e9b263f..f853b54 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ from eb_sqs.decorators import task @task(queue_name='test') def echo(message): - print message + print(message) echo.delay(message='Hello World!') ``` @@ -56,10 +56,10 @@ from eb_sqs.decorators import task @task(queue_name='test', max_retries=5) def upload_file(message): - print '# of retries: {}'.format(upload_file.retry_num) + print('# of retries: {}'.format(upload_file.retry_num)) try: # upload ... - expect ConnectionException: + except ConnectionException: upload_file.retry() ``` @@ -69,7 +69,7 @@ The retry call supports the `delay` and `execute_inline` arguments in order to d #### Executing Tasks -The Elastic Beanstalk Worker Tier sends all tasks to a API endpoint. django-eb-sqs has already such an endpoint which can be used by specifing the url mapping in your `urls.py` file. +The Elastic Beanstalk Worker Tier sends all tasks to a API endpoint. django-eb-sqs has already such an endpoint which can be used by specifying the url mapping in your `urls.py` file. ```python urlpatterns = [ @@ -114,7 +114,7 @@ python manage.py process_queue --queues queue1,prefix:pr1-,queue2 # process queu ``` #### Group Tasks -Multiple tasks can be grouped by specifing the `group_id` argument when calling `delay` on a task. +Multiple tasks can be grouped by specifying the `group_id` argument when calling `delay` on a task. If all tasks of a specific group are executed then the group callback task specified by `EB_SQS_GROUP_CALLBACK_TASK` is executed. Example calls: @@ -141,6 +141,8 @@ The following settings can be used to fine tune django-eb-sqs. Copy them into yo - EB_SQS_MAX_NUMBER_OF_MESSAGES (`10`): The maximum number of messages to read in a single call from SQS (<= 10). - EB_SQS_WAIT_TIME_S (`2`): The time to wait (seconds) when receiving messages from SQS. - EB_SQS_AUTO_ADD_QUEUE (`False`): If queues should be added automatically to AWS if they don't exist. +- EB_SQS_QUEUE_MESSAGE_RETENTION (`1209600`): The value (in seconds) to be passed to MessageRetentionPeriod parameter, when creating a queue (only relevant in case EB_SQS_AUTO_ADD_QUEUE is set to True). +- EB_SQS_QUEUE_VISIBILITY_TIMEOUT (`300`): The value (in seconds) to be passed to VisibilityTimeout parameter, when creating a queue (only relevant in case EB_SQS_AUTO_ADD_QUEUE is set to True). - EB_SQS_DEAD_LETTER_MODE (`False`): Enable if this worker is handling the SQS dead letter queue. Tasks won't be executed but group callback is. - EB_SQS_DEFAULT_DELAY (`0`): Default task delay time in seconds. - EB_SQS_DEFAULT_MAX_RETRIES (`0`): Default retry limit for all tasks. diff --git a/eb_sqs/aws/sqs_queue_client.py b/eb_sqs/aws/sqs_queue_client.py index 96d504e..461a17c 100644 --- a/eb_sqs/aws/sqs_queue_client.py +++ b/eb_sqs/aws/sqs_queue_client.py @@ -45,8 +45,8 @@ def _add_sqs_queue(self, queue_name): queue = self.sqs.create_queue( QueueName=queue_name, Attributes={ - 'MessageRetentionPeriod': '1209600', - 'VisibilityTimeout': '300' + 'MessageRetentionPeriod': settings.QUEUE_MESSAGE_RETENTION, + 'VisibilityTimeout': settings.QUEUE_VISIBILITY_TIMEOUT } ) self.queue_cache[queue_name] = queue diff --git a/eb_sqs/settings.py b/eb_sqs/settings.py index 485a97d..291ff7f 100644 --- a/eb_sqs/settings.py +++ b/eb_sqs/settings.py @@ -34,3 +34,6 @@ AWS_MAX_RETRIES = getattr(settings, 'EB_SQS_AWS_MAX_RETRIES', 30) # type: int REFRESH_PREFIX_QUEUES_S = getattr(settings, 'EB_SQS_REFRESH_PREFIX_QUEUES_S', 10) # type: int + +QUEUE_MESSAGE_RETENTION = getattr(settings, 'EB_SQS_QUEUE_MESSAGE_RETENTION', '1209600') # type: str +QUEUE_VISIBILITY_TIMEOUT = getattr(settings, 'EB_SQS_QUEUE_VISIBILITY_TIMEOUT', '300') # type: str