From 95b7614c89bcd08139f109cb6465997b2eb19692 Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Mon, 22 Jul 2019 16:14:10 -0700 Subject: [PATCH] support nesting tasks using a single-invocation patched method --- eb_sqs/auto_tasks/base_service.py | 24 -------------- eb_sqs/auto_tasks/service.py | 35 +++++++++++++++++---- eb_sqs/tests/auto_tasks/tests_auto_tasks.py | 18 +++++++++++ setup.py | 2 +- 4 files changed, 48 insertions(+), 31 deletions(-) delete mode 100644 eb_sqs/auto_tasks/base_service.py diff --git a/eb_sqs/auto_tasks/base_service.py b/eb_sqs/auto_tasks/base_service.py deleted file mode 100644 index 87400fc..0000000 --- a/eb_sqs/auto_tasks/base_service.py +++ /dev/null @@ -1,24 +0,0 @@ -from abc import ABCMeta, abstractmethod - - -class BaseAutoTaskService: - __metaclass__ = ABCMeta - - @abstractmethod - def register_task(self, method, queue_name=None, max_retries=None): - # type: (Any, str, int) -> None - pass - - -class NoopTaskService(BaseAutoTaskService): - def __init__(self): - # type: () -> None - self._registered_func_names = [] - - def register_task(self, method, queue_name=None, max_retries=None): - # type: (Any, str, int) -> None - self._registered_func_names.append(method.__name__) - - def is_func_name_registered(self, func_name): - # type: (str) -> bool - return func_name in self._registered_func_names diff --git a/eb_sqs/auto_tasks/service.py b/eb_sqs/auto_tasks/service.py index 7ed793b..3ea8fcf 100644 --- a/eb_sqs/auto_tasks/service.py +++ b/eb_sqs/auto_tasks/service.py @@ -1,7 +1,6 @@ import importlib import logging -from eb_sqs.auto_tasks.base_service import BaseAutoTaskService, NoopTaskService from eb_sqs.auto_tasks.exceptions import RetryableTaskException from eb_sqs.decorators import task from eb_sqs.worker.worker_exceptions import MaxRetriesReachedException @@ -24,11 +23,12 @@ def _auto_task_wrapper(module_name, class_name, func_name, *args, **kwargs): module = importlib.import_module(module_name) # import module class_ = getattr(module, class_name) # find class - noop_task_service = NoopTaskService() - instance = class_(auto_task_service=noop_task_service) # instantiate class using NoopTaskService + auto_task_executor_service = _AutoTaskExecutorService(func_name) + instance = class_(auto_task_service=auto_task_executor_service) # instantiate class using _AutoTaskExecutorService - if noop_task_service.is_func_name_registered(func_name): - getattr(instance, func_name)(*args, **kwargs) # invoke method on instance + executor_func_name = auto_task_executor_service.get_executor_func_name() + if executor_func_name: + getattr(instance, executor_func_name)(*args, **kwargs) # invoke method on instance else: logger.error( 'Trying to invoke _auto_task_wrapper for unregistered task with module: %s class: %s func: %s args: %s and kwargs: %s', @@ -57,7 +57,7 @@ def _auto_task_wrapper(module_name, class_name, func_name, *args, **kwargs): logger.error('Reached max retries in auto task {}.{}.{} with error: {}'.format(module_name, class_name, func_name, repr(exc))) -class AutoTaskService(BaseAutoTaskService): +class AutoTaskService(object): def register_task(self, method, queue_name=None, max_retries=None): # type: (Any, str, int) -> None instance = method.__self__ @@ -79,3 +79,26 @@ def _auto_task_wrapper_invoker(*args, **kwargs): ) setattr(instance, func_name, _auto_task_wrapper_invoker) + + +class _AutoTaskExecutorService(AutoTaskService): + def __init__(self, func_name): + # type: (str) -> None + self._func_name = func_name + + self._executor_func_name = None + + def register_task(self, method, queue_name=None, max_retries=None): + # type: (Any, str, int) -> None + if self._func_name == method.__name__: + # circuit breaker to allow actually executing the method once + instance = method.__self__ + + self._executor_func_name = self._func_name + '__auto_task_executor__' + setattr(instance, self._executor_func_name, getattr(instance, self._func_name)) + + super(_AutoTaskExecutorService, self).register_task(method, queue_name, max_retries) + + def get_executor_func_name(self): + # type: () -> str + return self._executor_func_name diff --git a/eb_sqs/tests/auto_tasks/tests_auto_tasks.py b/eb_sqs/tests/auto_tasks/tests_auto_tasks.py index 9b1b6dd..420d643 100644 --- a/eb_sqs/tests/auto_tasks/tests_auto_tasks.py +++ b/eb_sqs/tests/auto_tasks/tests_auto_tasks.py @@ -13,9 +13,13 @@ class TestService: def __init__(self, auto_task_service=None): self._auto_task_service = auto_task_service or AutoTaskService() + self._auto_task_service.register_task(self.task_method) self._auto_task_service.register_task(self.task_retry_method, max_retries=self._MAX_RETRY_NUM) + self._auto_task_service.register_task(self.task_recursive_method) + self._auto_task_service.register_task(self.task_other_method) + def task_method(self, *args, **kwargs): self._TEST_MOCK.task_method(*args, **kwargs) @@ -30,6 +34,15 @@ def max_retry_fun(): def non_task_method(self): self._TEST_MOCK.non_task_method() + def task_recursive_method(self, tries=2): + if tries > 0: + self.task_recursive_method(tries=tries - 1) + else: + self.task_other_method() + + def task_other_method(self): + self._TEST_MOCK.task_other_method() + class AutoTasksTest(TestCase): def setUp(self): @@ -61,3 +74,8 @@ def test_non_task_method(self): ) TestService._TEST_MOCK.non_task_method.assert_not_called() + + def test_task_recursive_method(self): + self._test_service.task_recursive_method() + + TestService._TEST_MOCK.task_other_method.assert_called_once_with() diff --git a/setup.py b/setup.py index 9f289e8..1ff9bfc 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup( name='django-eb-sqs', - version='1.31', + version='1.32', package_dir={'eb_sqs': 'eb_sqs'}, include_package_data=True, packages=find_packages(),