Skip to content

Commit

Permalink
Merge pull request #16 from cuda-networks/support_queue_prefixes
Browse files Browse the repository at this point in the history
Support queue prefixes
  • Loading branch information
alexeyts authored Jan 31, 2018
2 parents a872415 + 6341220 commit 80853d2
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 19 deletions.
20 changes: 15 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ from eb_sqs.decorators import task

@task(queue_name='test')
def echo(message):
print message
print(message)

echo.delay(message='Hello World!')
```
Expand Down Expand Up @@ -56,10 +56,10 @@ from eb_sqs.decorators import task

@task(queue_name='test', max_retries=5)
def upload_file(message):
print '# of retries: {}'.format(upload_file.retry_num)
print('# of retries: {}'.format(upload_file.retry_num))
try:
# upload ...
expect ConnectionException:
except ConnectionException:
upload_file.retry()
```

Expand All @@ -69,7 +69,7 @@ The retry call supports the `delay` and `execute_inline` arguments in order to d

#### Executing Tasks

The Elastic Beanstalk Worker Tier sends all tasks to a API endpoint. django-eb-sqs has already such an endpoint which can be used by specifing the url mapping in your `urls.py` file.
The Elastic Beanstalk Worker Tier sends all tasks to a API endpoint. django-eb-sqs has already such an endpoint which can be used by specifying the url mapping in your `urls.py` file.

```python
urlpatterns = [
Expand Down Expand Up @@ -105,9 +105,16 @@ python manage.py process_queue --queues <comma-delimited list of queue names>

This is a good idea for someone who wants to execute tasks without an Elastic Beanstalk worker.

You can either use full queue names, or queue prefix using `prefix:*my_example_prefix*` notation.

Examples:
```bash
python manage.py process_queue --queues queue1,queue2 # process queue1 and queue2
python manage.py process_queue --queues queue1,prefix:pr1-,queue2 # process queue1, queue2 and any queue whose name starts with 'pr1-'
```

#### Group Tasks
Multiple tasks can be grouped by specifing the `group_id` argument when calling `delay` on a task.
Multiple tasks can be grouped by specifying the `group_id` argument when calling `delay` on a task.
If all tasks of a specific group are executed then the group callback task specified by `EB_SQS_GROUP_CALLBACK_TASK` is executed.

Example calls:
Expand All @@ -134,6 +141,8 @@ The following settings can be used to fine tune django-eb-sqs. Copy them into yo
- EB_SQS_MAX_NUMBER_OF_MESSAGES (`10`): The maximum number of messages to read in a single call from SQS (<= 10).
- EB_SQS_WAIT_TIME_S (`2`): The time to wait (seconds) when receiving messages from SQS.
- EB_SQS_AUTO_ADD_QUEUE (`False`): If queues should be added automatically to AWS if they don't exist.
- EB_SQS_QUEUE_MESSAGE_RETENTION (`1209600`): The value (in seconds) to be passed to MessageRetentionPeriod parameter, when creating a queue (only relevant in case EB_SQS_AUTO_ADD_QUEUE is set to True).
- EB_SQS_QUEUE_VISIBILITY_TIMEOUT (`300`): The value (in seconds) to be passed to VisibilityTimeout parameter, when creating a queue (only relevant in case EB_SQS_AUTO_ADD_QUEUE is set to True).
- EB_SQS_DEAD_LETTER_MODE (`False`): Enable if this worker is handling the SQS dead letter queue. Tasks won't be executed but group callback is.
- EB_SQS_DEFAULT_DELAY (`0`): Default task delay time in seconds.
- EB_SQS_DEFAULT_MAX_RETRIES (`0`): Default retry limit for all tasks.
Expand All @@ -148,6 +157,7 @@ The following settings can be used to fine tune django-eb-sqs. Copy them into yo
- EB_SQS_REDIS_KEY_PREFIX (`eb-sqs-`): Prefix used for all Redis keys
- EB_SQS_USE_PICKLE (`False`): Enable to use `pickle` to serialize task parameters. Uses `json` as default.
- EB_SQS_AWS_MAX_RETRIES (`30`): Default retry limit on a boto3 call to AWS SQS.
- EB_SQS_REFRESH_PREFIX_QUEUES_S (`10`): Minimal number of seconds to wait between refreshing queue list, in case prefix is used


### Development
Expand Down
2 changes: 1 addition & 1 deletion development.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
boto3==1.4.7
boto3==1.4.8
Django==1.10.6
mock==2.0.0
moto==0.4.24
Expand Down
8 changes: 7 additions & 1 deletion eb_sqs/aws/sqs_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ def _get_sqs_queue(self, queue_name):
def _add_sqs_queue(self, queue_name):
# type: (unicode) -> Any
if settings.AUTO_ADD_QUEUE:
queue = self.sqs.create_queue(QueueName=queue_name)
queue = self.sqs.create_queue(
QueueName=queue_name,
Attributes={
'MessageRetentionPeriod': settings.QUEUE_MESSAGE_RETENTION,
'VisibilityTimeout': settings.QUEUE_VISIBILITY_TIMEOUT
}
)
self.queue_cache[queue_name] = queue
return queue
else:
Expand Down
56 changes: 46 additions & 10 deletions eb_sqs/management/commands/process_queue.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from __future__ import absolute_import, unicode_literals

from datetime import timedelta, datetime

import boto3
import logging

from botocore.config import Config
from django.core.management import BaseCommand, CommandError
from django.utils import timezone

from eb_sqs import settings
from eb_sqs.worker.worker import Worker
Expand All @@ -16,6 +19,8 @@
class Command(BaseCommand):
help = 'Command to process tasks from one or more SQS queues'

PREFIX_STR = 'prefix:'

def add_arguments(self, parser):
parser.add_argument('--queues', '-q',
dest='queue_names',
Expand All @@ -34,23 +39,41 @@ def handle(self, *args, **options):
region_name=settings.AWS_REGION,
config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES})
)
queues = [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names]

prefixes = list(filter(lambda qn: qn.startswith(self.PREFIX_STR), queue_names))
queues = self._get_queues_by_names(sqs, list(set(queue_names) - set(prefixes)))

queue_prefixes = [prefix.split(self.PREFIX_STR)[1] for prefix in prefixes]
static_queues = queues
last_update_time = timezone.make_aware(datetime.min)

logger.debug('[django-eb-sqs] Connected to SQS: {}'.format(', '.join(queue_names)))

worker = WorkerFactory.default().create()

while True:
for queue in queues:
messages = queue.receive_messages(
MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES,
WaitTimeSeconds=settings.WAIT_TIME_S,
)

for msg in messages:
self._process_message(msg, worker)
if len(queue_prefixes) > 0 and \
timezone.now() - timedelta(seconds=settings.REFRESH_PREFIX_QUEUES_S) > last_update_time:
queues = static_queues + self._get_queues_by_prefixes(sqs, queue_prefixes)
last_update_time = timezone.now()
logger.info('[django-eb-sqs] Updated SQS queues: {}'.format(
', '.join([queue.url for queue in queues])
))

def _process_message(self, msg, worker):
for queue in queues:
try:
messages = queue.receive_messages(
MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES,
WaitTimeSeconds=settings.WAIT_TIME_S,
)

for msg in messages:
self._process_message(msg, worker)
except Exception as exc:
logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=1)

@staticmethod
def _process_message(msg, worker):
# type: (Any, Worker) -> None
logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id))
try:
Expand All @@ -61,3 +84,16 @@ def _process_message(self, msg, worker):
finally:
msg.delete()
logger.debug('[django-eb-sqs] Deleted message {}'.format(msg.message_id))

@staticmethod
def _get_queues_by_names(sqs, queue_names):
return [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names]

@staticmethod
def _get_queues_by_prefixes(sqs, prefixes):
queues = []

for prefix in prefixes:
queues += sqs.queues.filter(QueueNamePrefix=prefix)

return queues
5 changes: 5 additions & 0 deletions eb_sqs/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,8 @@
DEAD_LETTER_MODE = getattr(settings, 'EB_SQS_DEAD_LETTER_MODE', False) # type: bool

AWS_MAX_RETRIES = getattr(settings, 'EB_SQS_AWS_MAX_RETRIES', 30) # type: int

REFRESH_PREFIX_QUEUES_S = getattr(settings, 'EB_SQS_REFRESH_PREFIX_QUEUES_S', 10) # type: int

QUEUE_MESSAGE_RETENTION = getattr(settings, 'EB_SQS_QUEUE_MESSAGE_RETENTION', '1209600') # type: str
QUEUE_VISIBILITY_TIMEOUT = getattr(settings, 'EB_SQS_QUEUE_VISIBILITY_TIMEOUT', '300') # type: str
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@

setup(
name='django-eb-sqs',
version='0.99',
version='1.0',
package_dir={'eb_sqs': 'eb_sqs'},
include_package_data=True,
packages=find_packages(),
description='A SQS worker implementation for Elastic Beanstalk',
long_description=README,
url='https://github.com/cuda-networks/django-eb-sqs',
install_requires=[
'boto3>=1.4.7',
'boto3>=1.4.8',
'Django>=1.7',
'redis>=2.10',
'requests>=2',
Expand Down

0 comments on commit 80853d2

Please sign in to comment.