Skip to content

Commit

Permalink
Merge pull request #38 from cuda-networks/support_task_nesting
Browse files Browse the repository at this point in the history
support nesting tasks using a single-invocation patched method
  • Loading branch information
alexeyts authored Jul 24, 2019
2 parents 6c1b6ab + 95b7614 commit 6c45b64
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 31 deletions.
24 changes: 0 additions & 24 deletions eb_sqs/auto_tasks/base_service.py

This file was deleted.

35 changes: 29 additions & 6 deletions eb_sqs/auto_tasks/service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import importlib
import logging

from eb_sqs.auto_tasks.base_service import BaseAutoTaskService, NoopTaskService
from eb_sqs.auto_tasks.exceptions import RetryableTaskException
from eb_sqs.decorators import task
from eb_sqs.worker.worker_exceptions import MaxRetriesReachedException
Expand All @@ -24,11 +23,12 @@ def _auto_task_wrapper(module_name, class_name, func_name, *args, **kwargs):
module = importlib.import_module(module_name) # import module
class_ = getattr(module, class_name) # find class

noop_task_service = NoopTaskService()
instance = class_(auto_task_service=noop_task_service) # instantiate class using NoopTaskService
auto_task_executor_service = _AutoTaskExecutorService(func_name)
instance = class_(auto_task_service=auto_task_executor_service) # instantiate class using _AutoTaskExecutorService

if noop_task_service.is_func_name_registered(func_name):
getattr(instance, func_name)(*args, **kwargs) # invoke method on instance
executor_func_name = auto_task_executor_service.get_executor_func_name()
if executor_func_name:
getattr(instance, executor_func_name)(*args, **kwargs) # invoke method on instance
else:
logger.error(
'Trying to invoke _auto_task_wrapper for unregistered task with module: %s class: %s func: %s args: %s and kwargs: %s',
Expand Down Expand Up @@ -57,7 +57,7 @@ def _auto_task_wrapper(module_name, class_name, func_name, *args, **kwargs):
logger.error('Reached max retries in auto task {}.{}.{} with error: {}'.format(module_name, class_name, func_name, repr(exc)))


class AutoTaskService(BaseAutoTaskService):
class AutoTaskService(object):
def register_task(self, method, queue_name=None, max_retries=None):
# type: (Any, str, int) -> None
instance = method.__self__
Expand All @@ -79,3 +79,26 @@ def _auto_task_wrapper_invoker(*args, **kwargs):
)

setattr(instance, func_name, _auto_task_wrapper_invoker)


class _AutoTaskExecutorService(AutoTaskService):
def __init__(self, func_name):
# type: (str) -> None
self._func_name = func_name

self._executor_func_name = None

def register_task(self, method, queue_name=None, max_retries=None):
# type: (Any, str, int) -> None
if self._func_name == method.__name__:
# circuit breaker to allow actually executing the method once
instance = method.__self__

self._executor_func_name = self._func_name + '__auto_task_executor__'
setattr(instance, self._executor_func_name, getattr(instance, self._func_name))

super(_AutoTaskExecutorService, self).register_task(method, queue_name, max_retries)

def get_executor_func_name(self):
# type: () -> str
return self._executor_func_name
18 changes: 18 additions & 0 deletions eb_sqs/tests/auto_tasks/tests_auto_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ class TestService:

def __init__(self, auto_task_service=None):
self._auto_task_service = auto_task_service or AutoTaskService()

self._auto_task_service.register_task(self.task_method)
self._auto_task_service.register_task(self.task_retry_method, max_retries=self._MAX_RETRY_NUM)

self._auto_task_service.register_task(self.task_recursive_method)
self._auto_task_service.register_task(self.task_other_method)

def task_method(self, *args, **kwargs):
self._TEST_MOCK.task_method(*args, **kwargs)

Expand All @@ -30,6 +34,15 @@ def max_retry_fun():
def non_task_method(self):
self._TEST_MOCK.non_task_method()

def task_recursive_method(self, tries=2):
if tries > 0:
self.task_recursive_method(tries=tries - 1)
else:
self.task_other_method()

def task_other_method(self):
self._TEST_MOCK.task_other_method()


class AutoTasksTest(TestCase):
def setUp(self):
Expand Down Expand Up @@ -61,3 +74,8 @@ def test_non_task_method(self):
)

TestService._TEST_MOCK.non_task_method.assert_not_called()

def test_task_recursive_method(self):
self._test_service.task_recursive_method()

TestService._TEST_MOCK.task_other_method.assert_called_once_with()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name='django-eb-sqs',
version='1.31',
version='1.32',
package_dir={'eb_sqs': 'eb_sqs'},
include_package_data=True,
packages=find_packages(),
Expand Down

0 comments on commit 6c45b64

Please sign in to comment.