From 15082db62a3f6dcce3ab4f560287c042899acab5 Mon Sep 17 00:00:00 2001 From: Rohan Patil Date: Thu, 9 Jan 2020 12:07:06 -0800 Subject: [PATCH 1/5] BNCASB-2204: Initial changes to clean the project (removing group client and unused code) Initial changes to clean the project (removing group client and unused code) --- README.md | 49 +-------- .../management/commands/run_eb_sqs_worker.py | 79 --------------- eb_sqs/redis/__init__.py | 1 - eb_sqs/redis/redis_group_client.py | 53 ---------- eb_sqs/tests/tests_views.py | 45 --------- eb_sqs/tests/worker/tests_worker.py | 99 +------------------ eb_sqs/urls.py | 9 -- eb_sqs/views.py | 23 ----- eb_sqs/worker/group_client.py | 31 ------ eb_sqs/worker/sqs_redis_worker_factory.py | 19 ---- eb_sqs/worker/sqs_worker_factory.py | 17 ++++ eb_sqs/worker/worker.py | 64 +----------- eb_sqs/worker/worker_factory.py | 4 +- 13 files changed, 24 insertions(+), 469 deletions(-) delete mode 100644 eb_sqs/management/commands/run_eb_sqs_worker.py delete mode 100644 eb_sqs/redis/__init__.py delete mode 100644 eb_sqs/redis/redis_group_client.py delete mode 100644 eb_sqs/tests/tests_views.py delete mode 100644 eb_sqs/urls.py delete mode 100644 eb_sqs/views.py delete mode 100644 eb_sqs/worker/group_client.py delete mode 100644 eb_sqs/worker/sqs_redis_worker_factory.py create mode 100644 eb_sqs/worker/sqs_worker_factory.py diff --git a/README.md b/README.md index f2fd2a7..70f96e9 100644 --- a/README.md +++ b/README.md @@ -67,33 +67,6 @@ The retry call supports the `delay` and `execute_inline` arguments in order to d **NOTE:** `retry()` throws a `MaxRetriesReachedException` exception if the maximum number of retries is reached. -#### Executing Tasks - -The Elastic Beanstalk Worker Tier sends all tasks to a API endpoint. django-eb-sqs has already such an endpoint which can be used by specifying the url mapping in your `urls.py` file. - -```python -urlpatterns = [ - ... - url(r'^worker/', include('eb_sqs.urls', namespace='eb_sqs')) -] -``` - -In that case the relative endpoint url would be: `worker/process` - -Set this url in the Elastic Beanstalk Worker settings prior to deployment. - -During development you can use the included Django command to execute a small script which retrieves messages from SQS and posts them to this endpoint. - -```bash -python manage.py run_eb_sqs_worker --url --queue -``` - -For example: - -```bash -python manage.py run_eb_sqs_worker --url http://127.0.0.1:80/worker/process --queue default -``` - #### Executing Tasks without Elastic Beanstalk Another way of executing tasks is to use the Django command `process_queue`. @@ -113,27 +86,7 @@ python manage.py process_queue --queues queue1,queue2 # process queue1 and queue python manage.py process_queue --queues queue1,prefix:pr1-,queue2 # process queue1, queue2 and any queue whose name starts with 'pr1-' ``` -Use the signals `MESSAGES_RECEIVED`, `MESSAGES_PROCESSED`, `MESSAGES_DELETED` of the `WorkerService` to get informed about the current SQS batch being processed by the management command. - -#### Group Tasks -Multiple tasks can be grouped by specifying the `group_id` argument when calling `delay` on a task. -If all tasks of a specific group are executed then the group callback task specified by `EB_SQS_GROUP_CALLBACK_TASK` is executed. - -Example calls: -```python -echo.delay(message='Hello World!', group_id='1') -echo.delay(message='Hallo Welt!', group_id='1') -echo.delay(message='Hola mundo!', group_id='1') -``` - -Example callback which is executed when all three tasks are finished: -```python -from eb_sqs.decorators import task - -@task(queue_name='test', max_retries=5) -def group_finished(group_id): - pass -``` +Use the signals `MESSAGES_RECEIVED`, `MESSAGES_PROCESSED`, `MESSAGES_DELETED` of #### Auto Tasks diff --git a/eb_sqs/management/commands/run_eb_sqs_worker.py b/eb_sqs/management/commands/run_eb_sqs_worker.py deleted file mode 100644 index c93abf6..0000000 --- a/eb_sqs/management/commands/run_eb_sqs_worker.py +++ /dev/null @@ -1,79 +0,0 @@ -from __future__ import absolute_import, unicode_literals - -import boto3 -from botocore.config import Config -from django.conf import settings -from django.core.management import BaseCommand, CommandError -import requests -from requests.exceptions import ConnectionError - - -class Command(BaseCommand): - help = "Command to run the EB SQS worker" - - def add_arguments(self, parser): - parser.add_argument('--url', '-u', - dest='url', - help='Url of the worker endpoint API') - - parser.add_argument('--queue', '-q', - dest='queue_name', - help='Name of the queue to process') - - parser.add_argument('--retries', '-r', - dest='retry_limit', - default=10, - help='Retry limit until the message is discarded (default 10)') - - def handle(self, *args, **options): - if not options['url']: - raise CommandError('Worker endpoint url parameter (--url) not found') - - if not options['queue_name']: - raise CommandError('Queue name (--queue) not specified') - - url = options['url'] - queue_name = options['queue_name'] - retry_limit = max(int(options['retry_limit']), 1) - - try: - self.stdout.write('Connect to SQS') - sqs = boto3.resource( - 'sqs', - region_name=settings.AWS_REGION, - config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES}) - ) - queue = sqs.get_queue_by_name(QueueName=queue_name) - self.stdout.write('> Connected') - - while True: - messages = queue.receive_messages( - MaxNumberOfMessages=1, - WaitTimeSeconds=20 - ) - - if len(messages) == 0: - break - - for msg in messages: - self.stdout.write('Deliver message {}'.format(msg.message_id)) - if self._process_message_with_retry(url, retry_limit, msg): - self.stdout.write('> Delivered') - else: - self.stdout.write('> Delivery failed (retry-limit reached)') - msg.delete() - - self.stdout.write('Message processing finished') - except ConnectionError: - self.stdout.write('Connection to {} failed. Message processing failed'.format(url)) - - def _process_message_with_retry(self, url, retry_limit, message): - for retry in range(retry_limit): - if self._process_message(url, message): - return True - - return False - - def _process_message(self, url, message): - response = requests.post(url, data=message.body) - return response.status_code == 200 diff --git a/eb_sqs/redis/__init__.py b/eb_sqs/redis/__init__.py deleted file mode 100644 index 01e6d4f..0000000 --- a/eb_sqs/redis/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from __future__ import absolute_import, unicode_literals diff --git a/eb_sqs/redis/redis_group_client.py b/eb_sqs/redis/redis_group_client.py deleted file mode 100644 index cdd7f40..0000000 --- a/eb_sqs/redis/redis_group_client.py +++ /dev/null @@ -1,53 +0,0 @@ -from __future__ import absolute_import, unicode_literals - -from redis import StrictRedis - -from eb_sqs import settings -from eb_sqs.worker.group_client import GroupClient -from eb_sqs.worker.worker_task import WorkerTask - - -class RedisGroupClient(GroupClient): - def __init__(self, redis_client): - # type: (StrictRedis) -> None - super(RedisGroupClient, self).__init__() - self._redis_client = redis_client - - def _key_name(self, group_id): - # type: (unicode) -> None - return '{}{}'.format(settings.REDIS_KEY_PREFIX, group_id) - - def _task_identifier(self, worker_task): - # type: (WorkerTask) -> unicode - if worker_task.retry_id: - return '{}-{}'.format(worker_task.id, worker_task.retry_id) - else: - return worker_task.id - - def add(self, worker_task): - # type: (WorkerTask) -> None - name = self._key_name(worker_task.group_id) - value = self._task_identifier(worker_task) - - pipe = self._redis_client.pipeline() - pipe.sadd(name, value)\ - .expire(name, settings.REDIS_EXPIRY)\ - .execute() - - def remove(self, worker_task): - # type: (WorkerTask) -> bool - """ - :return: True if last task in group - """ - name = self._key_name(worker_task.group_id) - value = self._task_identifier(worker_task) - - if self._redis_client.srem(name, value) > 0: - return self._redis_client.scard(name) == 0 - else: - return False - - def active_tasks(self, group_id): - # type: (unicode) -> int - name = self._key_name(group_id) - return self._redis_client.scard(name) diff --git a/eb_sqs/tests/tests_views.py b/eb_sqs/tests/tests_views.py deleted file mode 100644 index 5ffcad0..0000000 --- a/eb_sqs/tests/tests_views.py +++ /dev/null @@ -1,45 +0,0 @@ -from __future__ import absolute_import, unicode_literals - -from unittest import TestCase - -from django.test import Client -from mock import Mock - -from eb_sqs import settings -from eb_sqs.worker.worker import Worker -from eb_sqs.worker.worker_exceptions import InvalidMessageFormatException, ExecutionFailedException -from eb_sqs.worker.worker_factory import WorkerFactory - - -class ApiTest(TestCase): - def setup_worker(self, side_effect): - worker_mock = Mock(autospec=Worker) - worker_mock.execute.side_effect = side_effect - - worker_factory_mock = Mock(autospec=WorkerFactory) - worker_factory_mock.create.return_value = worker_mock - - settings.WORKER_FACTORY = worker_factory_mock - - def test_process_endpoint(self): - self.setup_worker(None) - client = Client() - response = client.post('/process', content_type='application/json', data='') - - self.assertEqual(response.status_code, 200) - - def test_process_endpoint_invalid_format(self): - self.setup_worker(InvalidMessageFormatException('', None)) - client = Client() - - response = client.post('/process', content_type='application/json', data='') - - self.assertEqual(response.status_code, 400) - - def test_process_endpoint_invalid_function(self): - self.setup_worker(ExecutionFailedException('', None)) - client = Client() - - response = client.post('/process', content_type='application/json', data='') - - self.assertEqual(response.status_code, 500) diff --git a/eb_sqs/tests/worker/tests_worker.py b/eb_sqs/tests/worker/tests_worker.py index c13a2f3..a53788d 100644 --- a/eb_sqs/tests/worker/tests_worker.py +++ b/eb_sqs/tests/worker/tests_worker.py @@ -6,7 +6,6 @@ from eb_sqs import settings from eb_sqs.decorators import task -from eb_sqs.worker.group_client import GroupClient from eb_sqs.worker.queue_client import QueueClient from eb_sqs.worker.worker import Worker from eb_sqs.worker.worker_exceptions import MaxRetriesReachedException @@ -65,35 +64,18 @@ def setUp(self): settings.DEAD_LETTER_MODE = False self.queue_mock = Mock(autospec=QueueClient) - self.group_mock = Mock(autospec=GroupClient) - self.group_mock.remove.return_value = True - self.worker = Worker(self.queue_mock, self.group_mock) + self.worker = Worker(self.queue_mock) factory_mock = Mock(autospec=WorkerFactory) factory_mock.create.return_value = self.worker settings.WORKER_FACTORY = factory_mock - def setUpGroupsHandling(self): - self.group_set = set() - self.group_mock.add.side_effect = lambda tsk: self.group_set.add('{}-{}'.format(tsk.id, tsk.retry_id)) - self.group_mock.remove.side_effect = lambda tsk: len(self.group_set) == 0 if self.group_set.discard( - '{}-{}'.format(tsk.id, tsk.retry_id)) is None else False - def test_worker_execution_no_group(self): msg = '{"id": "id-1", "retry": 0, "queue": "default", "maxRetries": 5, "args": [], "func": "eb_sqs.tests.worker.tests_worker.dummy_task", "kwargs": {"msg": "Hello World!"}}' result = self.worker.execute(msg, 2) self.assertEqual(result, 'Hello World!') - self.group_mock.remove.assert_not_called() - - def test_worker_execution_with_group(self): - msg = '{"id": "id-1", "groupId": "group-5", "retry": 0, "queue": "default", "maxRetries": 5, "args": [], "func": "eb_sqs.tests.worker.tests_worker.dummy_task", "kwargs": {"msg": "Hello World!"}}' - - result = self.worker.execute(msg) - - self.assertEqual(result, 'Hello World!') - self.group_mock.remove.assert_called_once() def test_worker_execution_dead_letter_queue(self): settings.DEAD_LETTER_MODE = True @@ -103,12 +85,10 @@ def test_worker_execution_dead_letter_queue(self): result = self.worker.execute(msg) self.assertIsNone(result) - self.group_mock.remove.assert_called_once() def test_delay(self): self.worker.delay(None, 'queue', dummy_task, [], {'msg': 'Hello World!'}, 5, False, 3, False) - self.group_mock.add.assert_not_called() self.queue_mock.add_message.assert_called_once() queue_delay = self.queue_mock.add_message.call_args[0][2] self.assertEqual(queue_delay, 3) @@ -116,15 +96,9 @@ def test_delay(self): def test_delay_inline(self): result = self.worker.delay(None, 'queue', dummy_task, [], {'msg': 'Hello World!'}, 5, False, 0, True) - self.group_mock.add.assert_not_called() self.queue_mock.add_message.assert_not_called() self.assertEqual(result, 'Hello World!') - def test_delay_with_group(self): - self.worker.delay('group-id', 'queue', dummy_task, [], {'msg': 'Hello World!'}, 5, False, 3, False) - - self.group_mock.add.assert_called_once() - def test_retry_max_reached_execution(self): with self.assertRaises(MaxRetriesReachedException): max_retries_task.delay(execute_inline=True) @@ -133,74 +107,3 @@ def test_retry_no_limit(self): retries_task.delay(10, execute_inline=True) self.assertEqual(retries_task.retry_num, 10) - - def test_group(self): - settings.GROUP_CALLBACK_TASK = Mock() - - self.worker.delay('group-id', 'queue', dummy_task, [], {'msg': 'Hello World!'}, 5, False, 0, True) - - settings.GROUP_CALLBACK_TASK.delay.assert_called_once() - settings.GROUP_CALLBACK_TASK = None - - def test_group_with_exception(self): - settings.GROUP_CALLBACK_TASK = Mock() - self.setUpGroupsHandling() - - with self.assertRaises(TestException): - exception_group_task.delay(group_id='group-id', execute_inline=True) - - self.assertEqual(len(self.group_set), 0) - self.assertEqual(self.group_mock.add.call_count, 1) - self.assertEqual(self.group_mock.remove.call_count, 1) - - settings.GROUP_CALLBACK_TASK.delay.assert_called_once() - settings.GROUP_CALLBACK_TASK = None - - def test_group_retries(self): - settings.GROUP_CALLBACK_TASK = Mock() - self.setUpGroupsHandling() - - repeating_group_task.delay(3, group_id='group-id', execute_inline=True) - - self.assertEqual(len(self.group_set), 0) - self.assertEqual(self.group_mock.add.call_count, 4) - self.assertEqual(self.group_mock.remove.call_count, 4) - - settings.GROUP_CALLBACK_TASK.delay.assert_called_once() - settings.GROUP_CALLBACK_TASK = None - - def test_group_exception_in_retries(self): - settings.GROUP_CALLBACK_TASK = Mock() - self.setUpGroupsHandling() - - with self.assertRaises(TestException): - exception_repeating_group_task.delay(2, group_id='group-id', execute_inline=True) - - self.assertEqual(len(self.group_set), 0) - self.assertEqual(self.group_mock.add.call_count, 3) - self.assertEqual(self.group_mock.remove.call_count, 3) - - settings.GROUP_CALLBACK_TASK.delay.assert_called_once() - settings.GROUP_CALLBACK_TASK = None - - def test_group_match_retries_reached(self): - settings.GROUP_CALLBACK_TASK = Mock() - self.setUpGroupsHandling() - - with self.assertRaises(MaxRetriesReachedException): - max_retries_group_task.delay(group_id='group-id', execute_inline=True) - - self.assertEqual(len(self.group_set), 0) - self.assertEqual(self.group_mock.add.call_count, 5) - self.assertEqual(self.group_mock.remove.call_count, 5) - - settings.GROUP_CALLBACK_TASK.delay.assert_called_once() - settings.GROUP_CALLBACK_TASK = None - - def test_group_callback_string(self): - settings.GROUP_CALLBACK_TASK = 'eb_sqs.tests.worker.tests_worker.global_group_mock' - - self.worker.delay('group-id', 'queue', dummy_task, [], {'msg': 'Hello World!'}, 5, False, 0, True) - - global_group_mock.delay.assert_called_once() - settings.GROUP_CALLBACK_TASK = None diff --git a/eb_sqs/urls.py b/eb_sqs/urls.py deleted file mode 100644 index ac27786..0000000 --- a/eb_sqs/urls.py +++ /dev/null @@ -1,9 +0,0 @@ -from __future__ import absolute_import, unicode_literals - -from django.conf.urls import url - -from eb_sqs.views import process_task - -urlpatterns = [ - url(r'^process$', process_task, name='process_task'), -] diff --git a/eb_sqs/views.py b/eb_sqs/views.py deleted file mode 100644 index 1ef7b4c..0000000 --- a/eb_sqs/views.py +++ /dev/null @@ -1,23 +0,0 @@ -from __future__ import absolute_import, unicode_literals - -from django.http import HttpResponseServerError, HttpResponse -from django.views.decorators.csrf import csrf_exempt -from django.views.decorators.http import require_http_methods - -from eb_sqs.worker.worker_exceptions import ExecutionFailedException -from eb_sqs.worker.worker_exceptions import InvalidMessageFormatException -from eb_sqs.worker.worker_factory import WorkerFactory - - -@require_http_methods(['POST']) -@csrf_exempt -def process_task(request): - # type: (HttpRequest) -> HttpResponse - try: - worker = WorkerFactory.default().create() - worker.execute(request.body) - return HttpResponse(status=200) - except InvalidMessageFormatException: - return HttpResponse(status=400) - except ExecutionFailedException: - return HttpResponseServerError() diff --git a/eb_sqs/worker/group_client.py b/eb_sqs/worker/group_client.py deleted file mode 100644 index 3b699b8..0000000 --- a/eb_sqs/worker/group_client.py +++ /dev/null @@ -1,31 +0,0 @@ -from __future__ import absolute_import, unicode_literals - -from abc import ABCMeta, abstractmethod - -from eb_sqs.worker.worker_task import WorkerTask - - -class GroupClient(object): - __metaclass__ = ABCMeta - - def __init__(self): - # type: () -> None - super(GroupClient, self).__init__() - - @abstractmethod - def add(self, worker_task): - # type: (WorkerTask) -> None - pass - - @abstractmethod - def remove(self, worker_task): - # type: (WorkerTask) -> bool - """ - :return: True if last task in group - """ - pass - - @abstractmethod - def active_tasks(self, group_id): - # type: (unicode) -> int - pass diff --git a/eb_sqs/worker/sqs_redis_worker_factory.py b/eb_sqs/worker/sqs_redis_worker_factory.py deleted file mode 100644 index 4e756e1..0000000 --- a/eb_sqs/worker/sqs_redis_worker_factory.py +++ /dev/null @@ -1,19 +0,0 @@ -from __future__ import absolute_import, unicode_literals - -from eb_sqs import settings -from eb_sqs.aws.sqs_queue_client import SqsQueueClient -from eb_sqs.redis.redis_group_client import RedisGroupClient -from eb_sqs.worker.worker import Worker -from eb_sqs.worker.worker_factory import WorkerFactory - - -class SqsRedisWorkerFactory(WorkerFactory): - _WORKER = None # type: Worker - - def __init__(self): - super(SqsRedisWorkerFactory, self).__init__() - - def create(self): - if not SqsRedisWorkerFactory._WORKER: - SqsRedisWorkerFactory._WORKER = Worker(SqsQueueClient(), RedisGroupClient(settings.REDIS_CLIENT)) - return SqsRedisWorkerFactory._WORKER diff --git a/eb_sqs/worker/sqs_worker_factory.py b/eb_sqs/worker/sqs_worker_factory.py new file mode 100644 index 0000000..3b9c32d --- /dev/null +++ b/eb_sqs/worker/sqs_worker_factory.py @@ -0,0 +1,17 @@ +from __future__ import absolute_import, unicode_literals + +from eb_sqs.aws.sqs_queue_client import SqsQueueClient +from eb_sqs.worker.worker import Worker +from eb_sqs.worker.worker_factory import WorkerFactory + + +class SqsWorkerFactory(WorkerFactory): + _WORKER = None # type: Worker + + def __init__(self): + super(SqsWorkerFactory, self).__init__() + + def create(self): + if not SqsWorkerFactory._WORKER: + SqsWorkerFactory._WORKER = Worker(SqsQueueClient()) + return SqsWorkerFactory._WORKER diff --git a/eb_sqs/worker/worker.py b/eb_sqs/worker/worker.py index dca483c..1206fec 100644 --- a/eb_sqs/worker/worker.py +++ b/eb_sqs/worker/worker.py @@ -1,13 +1,9 @@ from __future__ import absolute_import, unicode_literals -import importlib import logging import uuid -from six import string_types - from eb_sqs import settings -from eb_sqs.worker.group_client import GroupClient from eb_sqs.worker.queue_client import QueueDoesNotExistException, QueueClient, QueueClientException from eb_sqs.worker.worker_exceptions import InvalidMessageFormatException, ExecutionFailedException, \ MaxRetriesReachedException, InvalidQueueException, QueueException @@ -17,11 +13,10 @@ class Worker(object): - def __init__(self, queue_client, group_client): - # type: (QueueClient, GroupClient) -> None + def __init__(self, queue_client): + # type: (QueueClient) -> None super(Worker, self).__init__() self.queue_client = queue_client - self.group_client = group_client def execute(self, msg, receive_count=1): # type: (unicode, int) -> Any @@ -51,7 +46,6 @@ def execute(self, msg, receive_count=1): worker_task.retry_id, ) - self._remove_from_group(worker_task) else: logger.debug( 'Execute task %s (%s, retry-id: %s) with args: %s and kwargs: %s', @@ -99,8 +93,6 @@ def _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retr if worker_task.retry >= worker_task.max_retries: raise MaxRetriesReachedException(worker_task.retry) - self._add_to_group(worker_task) - logger.debug('%s task %s (%s, retry-id: %s): %s, %s (%s%s)', 'Retrying' if is_retry else 'Delaying', worker_task.abs_func_name, @@ -117,10 +109,8 @@ def _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retr self.queue_client.add_message(worker_task.queue, worker_task.serialize(), delay) return None except QueueDoesNotExistException as ex: - self._remove_from_group(worker_task) raise InvalidQueueException(ex.queue_name) except QueueClientException as ex: - self._remove_from_group(worker_task) logger.warning('Task %s (%s, retry-id: %s) failed to enqueue to %s: %s', worker_task.abs_func_name, worker_task.id, @@ -136,52 +126,4 @@ def _execute_task(self, worker_task): result = worker_task.execute() return result finally: - self._remove_from_group(worker_task) - - def _add_to_group(self, worker_task): - # type: (WorkerTask) -> None - if worker_task.group_id: - logger.debug( - 'Add task %s (%s, retry-id: %s) to group %s', - worker_task.abs_func_name, - worker_task.id, - worker_task.retry_id, - worker_task.group_id, - ) - - self.group_client.add(worker_task) - - def _remove_from_group(self, worker_task): - # type: (WorkerTask) -> None - if worker_task.group_id: - logger.debug( - 'Remove task %s (%s, retry-id: %s) from group %s', - worker_task.abs_func_name, - worker_task.id, - worker_task.retry_id, - worker_task.group_id, - ) - - if self.group_client.remove(worker_task): - self._execute_group_callback(worker_task) - - @staticmethod - def _execute_group_callback(worker_task): - # type: (WorkerTask) -> None - if settings.GROUP_CALLBACK_TASK: - callback = settings.GROUP_CALLBACK_TASK - - if isinstance(callback, string_types): - func_name = callback.split(".")[-1] - func_path = ".".join(callback.split(".")[:-1]) - func_module = importlib.import_module(func_path) - - callback = getattr(func_module, func_name) - - logger.debug( - 'All tasks in group %s finished. Trigger callback %s', - worker_task.group_id, - '{}.{}'.format(callback.__module__, callback.func_name), - ) - - callback.delay(worker_task.group_id) + pass diff --git a/eb_sqs/worker/worker_factory.py b/eb_sqs/worker/worker_factory.py index 0554e10..c7009ef 100644 --- a/eb_sqs/worker/worker_factory.py +++ b/eb_sqs/worker/worker_factory.py @@ -22,7 +22,7 @@ def create(self): def default(): # type: () -> WorkerFactory if not settings.WORKER_FACTORY: - from eb_sqs.worker.sqs_redis_worker_factory import SqsRedisWorkerFactory - return SqsRedisWorkerFactory() + from eb_sqs.worker.sqs_worker_factory import SqsWorkerFactory + return SqsWorkerFactory() else: return settings.WORKER_FACTORY From 0ef48a263215f7ddbac9ef879d3835cfc6bdc54b Mon Sep 17 00:00:00 2001 From: Rohan Patil Date: Thu, 9 Jan 2020 15:31:18 -0800 Subject: [PATCH 2/5] BNCASB-2204: Fixing some more changes --- README.md | 2 +- eb_sqs/tests/worker/tests_worker.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 70f96e9..07e52c6 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,7 @@ The retry call supports the `delay` and `execute_inline` arguments in order to d **NOTE:** `retry()` throws a `MaxRetriesReachedException` exception if the maximum number of retries is reached. -#### Executing Tasks without Elastic Beanstalk +#### Executing Tasks Another way of executing tasks is to use the Django command `process_queue`. This command can work with one or more queues, reading from the queues infinitely and executing tasks as they come-in. diff --git a/eb_sqs/tests/worker/tests_worker.py b/eb_sqs/tests/worker/tests_worker.py index a53788d..e9c2a92 100644 --- a/eb_sqs/tests/worker/tests_worker.py +++ b/eb_sqs/tests/worker/tests_worker.py @@ -70,7 +70,7 @@ def setUp(self): factory_mock.create.return_value = self.worker settings.WORKER_FACTORY = factory_mock - def test_worker_execution_no_group(self): + def test_worker_execution(self): msg = '{"id": "id-1", "retry": 0, "queue": "default", "maxRetries": 5, "args": [], "func": "eb_sqs.tests.worker.tests_worker.dummy_task", "kwargs": {"msg": "Hello World!"}}' result = self.worker.execute(msg, 2) From 0a84c9f8e49f872db3c7ec1c309c8f3abf6c1102 Mon Sep 17 00:00:00 2001 From: Rohan Patil Date: Thu, 9 Jan 2020 15:43:50 -0800 Subject: [PATCH 3/5] BNCASB-2204: Update the README file --- README.md | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 07e52c6..01dc003 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ -## Django EB SQS - Background Tasks for Elastic Beanstalk and Amazon SQS +## Django EB SQS - Background Tasks for Amazon SQS -django-eb-sqs is a simple task manager for the Elastic Beanstalk Worker Tier. It uses SQS and the [boto3](https://github.com/boto/boto3) library. +django-eb-sqs is a simple task manager for AWS SQS. It uses SQS and the [boto3](https://github.com/boto/boto3) library. ### Installation @@ -76,8 +76,6 @@ This command can work with one or more queues, reading from the queues infinitel python manage.py process_queue --queues ``` -This is a good idea for someone who wants to execute tasks without an Elastic Beanstalk worker. - You can either use full queue names, or queue prefix using `prefix:*my_example_prefix*` notation. Examples: From 912a8dfe1ef167db8460640257f161bb7be1ffcc Mon Sep 17 00:00:00 2001 From: Rohan Patil Date: Thu, 9 Jan 2020 16:45:29 -0800 Subject: [PATCH 4/5] BNCASB-2204: Remove the redis dependency from development.txt and setup.py --- README.md | 2 +- development.txt | 3 +-- setup.py | 3 +-- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 01dc003..2814ecb 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ python manage.py process_queue --queues queue1,queue2 # process queue1 and queue python manage.py process_queue --queues queue1,prefix:pr1-,queue2 # process queue1, queue2 and any queue whose name starts with 'pr1-' ``` -Use the signals `MESSAGES_RECEIVED`, `MESSAGES_PROCESSED`, `MESSAGES_DELETED` of +Use the signals `MESSAGES_RECEIVED`, `MESSAGES_PROCESSED`, `MESSAGES_DELETED` of the `WorkerService` to get informed about the current SQS batch being processed by the management command. #### Auto Tasks diff --git a/development.txt b/development.txt index 144614c..444407a 100644 --- a/development.txt +++ b/development.txt @@ -2,5 +2,4 @@ boto3==1.9.86 Django==1.10.6 mock==2.0.0 moto==1.3.13 -redis==2.10.5 -requests==2.10.0 +requests==2.10.0 \ No newline at end of file diff --git a/setup.py b/setup.py index 6034743..1a057e5 100644 --- a/setup.py +++ b/setup.py @@ -10,13 +10,12 @@ package_dir={'eb_sqs': 'eb_sqs'}, include_package_data=True, packages=find_packages(), - description='A SQS worker implementation for Elastic Beanstalk', + description='A simple task manager for AWS SQS', long_description=README, url='https://github.com/cuda-networks/django-eb-sqs', install_requires=[ 'boto3>=1.9.86', 'Django>=1.10.6', - 'redis>=2.10.5', 'requests>=2.10.0', ] ) From 2212d14f2dbad2361886e7dd27b84b9de9534a40 Mon Sep 17 00:00:00 2001 From: Rohan Patil Date: Fri, 10 Jan 2020 15:18:04 -0800 Subject: [PATCH 5/5] BNCASB-2204: Updating the project version number Also resolving PR comments. --- README.md | 2 +- eb_sqs/worker/worker.py | 7 ++----- setup.py | 2 +- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 2814ecb..bbd3c7e 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,7 @@ The retry call supports the `delay` and `execute_inline` arguments in order to d #### Executing Tasks -Another way of executing tasks is to use the Django command `process_queue`. +In order to execute tasks, use the Django command `process_queue`. This command can work with one or more queues, reading from the queues infinitely and executing tasks as they come-in. ```bash diff --git a/eb_sqs/worker/worker.py b/eb_sqs/worker/worker.py index 1206fec..9da54d8 100644 --- a/eb_sqs/worker/worker.py +++ b/eb_sqs/worker/worker.py @@ -122,8 +122,5 @@ def _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retr def _execute_task(self, worker_task): # type: (WorkerTask) -> Any - try: - result = worker_task.execute() - return result - finally: - pass + result = worker_task.execute() + return result diff --git a/setup.py b/setup.py index 1a057e5..d340bd7 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup( name='django-eb-sqs', - version='1.35', + version='1.36', package_dir={'eb_sqs': 'eb_sqs'}, include_package_data=True, packages=find_packages(),