From d78493e31dc1c781ee6047762bee9f91da89c8c5 Mon Sep 17 00:00:00 2001 From: Sherwin Lu Date: Wed, 12 Apr 2023 12:19:14 -0700 Subject: [PATCH] Python 3.9 migration (#63) --- .circleci/config.yml | 18 ++++---- development.txt | 10 ++--- eb_sqs/auto_tasks/exceptions.py | 9 ++-- eb_sqs/auto_tasks/service.py | 18 ++++---- eb_sqs/aws/sqs_queue_client.py | 15 +++---- eb_sqs/decorators.py | 23 ++++------ eb_sqs/management/commands/process_queue.py | 2 - eb_sqs/settings.py | 2 - eb_sqs/test_settings.py | 2 - eb_sqs/tests/tests_decorators.py | 11 ++--- eb_sqs/tests/worker/tests_worker.py | 2 - eb_sqs/tests/worker/tests_worker_task.py | 3 -- eb_sqs/worker/commons.py | 3 -- eb_sqs/worker/queue_client.py | 11 ++--- eb_sqs/worker/service.py | 50 +++++++++------------ eb_sqs/worker/sqs_worker_factory.py | 2 - eb_sqs/worker/worker.py | 26 +++++------ eb_sqs/worker/worker_exceptions.py | 15 ++----- eb_sqs/worker/worker_factory.py | 14 +----- eb_sqs/worker/worker_task.py | 46 ++++++++----------- setup.py | 3 +- 21 files changed, 104 insertions(+), 181 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 8e36cd3..76c3014 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -3,24 +3,22 @@ workflows: version: 2 eb-sqs-jobs: jobs: - - test-python-3-7 - - test-python-2-7 + - test-python-3-9 jobs: - test-python-3-7: &test-template + test-python-3-9: docker: - - image: circleci/python:3.7.6 + - image: cimg/python:3.9.16 steps: - add_ssh_keys - checkout - run: - name: Install dependencies + name: Install pip packages command: | - sudo pip install -r development.txt + python3 -m venv venv + . venv/bin/activate + pip install -r development.txt - run: name: Run tests command: | + . venv/bin/activate python -m django test --settings=eb_sqs.test_settings - test-python-2-7: - <<: *test-template - docker: - - image: circleci/python:2.7.16 diff --git a/development.txt b/development.txt index 444407a..321a8e4 100644 --- a/development.txt +++ b/development.txt @@ -1,5 +1,5 @@ -boto3==1.9.86 -Django==1.10.6 -mock==2.0.0 -moto==1.3.13 -requests==2.10.0 \ No newline at end of file +boto3==1.26.99 +Django==4.1.7 +mock==5.0.1 +moto==4.1.6 +requests==2.28.2 diff --git a/eb_sqs/auto_tasks/exceptions.py b/eb_sqs/auto_tasks/exceptions.py index fdbe839..362c0d4 100644 --- a/eb_sqs/auto_tasks/exceptions.py +++ b/eb_sqs/auto_tasks/exceptions.py @@ -1,12 +1,13 @@ +from typing import Any + + class RetryableTaskException(Exception): - def __init__(self, inner, delay=None, count_retries=None, max_retries_func=None): - # type: (Exception, int, bool, Any) -> None + def __init__(self, inner: Exception, delay: int = None, count_retries: bool = None, max_retries_func: Any = None): self._inner = inner self.delay = delay self.count_retries = count_retries self.max_retries_func = max_retries_func - def __repr__(self): - # type: () -> str + def __repr__(self) -> str: return repr(self._inner) diff --git a/eb_sqs/auto_tasks/service.py b/eb_sqs/auto_tasks/service.py index 3ea8fcf..77ccfbe 100644 --- a/eb_sqs/auto_tasks/service.py +++ b/eb_sqs/auto_tasks/service.py @@ -1,5 +1,6 @@ import importlib import logging +from typing import Any from eb_sqs.auto_tasks.exceptions import RetryableTaskException from eb_sqs.decorators import task @@ -24,7 +25,7 @@ def _auto_task_wrapper(module_name, class_name, func_name, *args, **kwargs): class_ = getattr(module, class_name) # find class auto_task_executor_service = _AutoTaskExecutorService(func_name) - instance = class_(auto_task_service=auto_task_executor_service) # instantiate class using _AutoTaskExecutorService + instance = class_(auto_task_service=auto_task_executor_service) executor_func_name = auto_task_executor_service.get_executor_func_name() if executor_func_name: @@ -54,12 +55,12 @@ def _auto_task_wrapper(module_name, class_name, func_name, *args, **kwargs): exc.max_retries_func() else: # by default log an error - logger.error('Reached max retries in auto task {}.{}.{} with error: {}'.format(module_name, class_name, func_name, repr(exc))) + logger.error('Reached max retries in auto task {}.{}.{} with error: {}'.format(module_name, class_name, + func_name, repr(exc))) class AutoTaskService(object): - def register_task(self, method, queue_name=None, max_retries=None): - # type: (Any, str, int) -> None + def register_task(self, method: Any, queue_name: str = None, max_retries: int = None): instance = method.__self__ class_ = instance.__class__ func_name = method.__name__ @@ -82,14 +83,12 @@ def _auto_task_wrapper_invoker(*args, **kwargs): class _AutoTaskExecutorService(AutoTaskService): - def __init__(self, func_name): - # type: (str) -> None + def __init__(self, func_name: str): self._func_name = func_name self._executor_func_name = None - def register_task(self, method, queue_name=None, max_retries=None): - # type: (Any, str, int) -> None + def register_task(self, method: Any, queue_name: str = None, max_retries: int = None): if self._func_name == method.__name__: # circuit breaker to allow actually executing the method once instance = method.__self__ @@ -99,6 +98,5 @@ def register_task(self, method, queue_name=None, max_retries=None): super(_AutoTaskExecutorService, self).register_task(method, queue_name, max_retries) - def get_executor_func_name(self): - # type: () -> str + def get_executor_func_name(self) -> str: return self._executor_func_name diff --git a/eb_sqs/aws/sqs_queue_client.py b/eb_sqs/aws/sqs_queue_client.py index faaf075..f2fe235 100644 --- a/eb_sqs/aws/sqs_queue_client.py +++ b/eb_sqs/aws/sqs_queue_client.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import, unicode_literals +from typing import Any import boto3 from botocore.config import Config @@ -10,15 +10,13 @@ class SqsQueueClient(QueueClient): def __init__(self): - # type: () -> None self.sqs = boto3.resource('sqs', region_name=settings.AWS_REGION, config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES}) ) self.queue_cache = {} - def _get_queue(self, queue_name, use_cache=True): - # type: (unicode, bool) -> Any + def _get_queue(self, queue_name: str, use_cache: bool = True) -> Any: full_queue_name = '{}{}'.format(settings.QUEUE_PREFIX, queue_name) queue = self._get_sqs_queue(full_queue_name, use_cache) @@ -27,8 +25,7 @@ def _get_queue(self, queue_name, use_cache=True): return queue - def _get_sqs_queue(self, queue_name, use_cache): - # type: (unicode, bool) -> Any + def _get_sqs_queue(self, queue_name: str, use_cache: bool) -> Any: if use_cache and self.queue_cache.get(queue_name): return self.queue_cache[queue_name] @@ -43,8 +40,7 @@ def _get_sqs_queue(self, queue_name, use_cache): else: raise ex - def _add_sqs_queue(self, queue_name): - # type: (unicode) -> Any + def _add_sqs_queue(self, queue_name: str) -> Any: if settings.AUTO_ADD_QUEUE: queue = self.sqs.create_queue( QueueName=queue_name, @@ -58,8 +54,7 @@ def _add_sqs_queue(self, queue_name): else: raise QueueDoesNotExistException(queue_name) - def add_message(self, queue_name, msg, delay): - # type: (unicode, unicode, int) -> None + def add_message(self, queue_name: str, msg: str, delay: int): try: queue = self._get_queue(queue_name) try: diff --git a/eb_sqs/decorators.py b/eb_sqs/decorators.py index 4b721ef..88a9197 100644 --- a/eb_sqs/decorators.py +++ b/eb_sqs/decorators.py @@ -1,19 +1,16 @@ -from __future__ import absolute_import, unicode_literals +from typing import Any from eb_sqs import settings from eb_sqs.worker.worker_factory import WorkerFactory from eb_sqs.worker.worker_task import WorkerTask -def _get_kwarg_val(kwargs, key, default): - # type: (dict, str, Any) -> Any +def _get_kwarg_val(kwargs: dict, key: str, default: Any) -> Any: return kwargs.pop(key, default) if kwargs else default -def func_delay_decorator(func, queue_name, max_retries_count, use_pickle): - # type: (Any, str, int, bool) -> (tuple, dict) - def wrapper(*args, **kwargs): - # type: (tuple, dict) -> Any +def func_delay_decorator(func: Any, queue_name: str, max_retries_count: int, use_pickle: bool) -> (tuple, dict): + def wrapper(*args: tuple, **kwargs: dict) -> Any: queue = _get_kwarg_val(kwargs, 'queue_name', queue_name if queue_name else settings.DEFAULT_QUEUE) max_retries = _get_kwarg_val(kwargs, 'max_retries', max_retries_count if max_retries_count else settings.DEFAULT_MAX_RETRIES) pickle = _get_kwarg_val(kwargs, 'use_pickle', use_pickle if use_pickle else settings.USE_PICKLE) @@ -28,10 +25,8 @@ def wrapper(*args, **kwargs): return wrapper -def func_retry_decorator(worker_task): - # type: (WorkerTask) -> (tuple, dict) - def wrapper(*args, **kwargs): - # type: (tuple, dict) -> Any +def func_retry_decorator(worker_task: WorkerTask) -> (tuple, dict): + def wrapper(*args: tuple, **kwargs: dict) -> Any: execute_inline = _get_kwarg_val(kwargs, 'execute_inline', False) or settings.EXECUTE_INLINE delay = _get_kwarg_val(kwargs, 'delay', settings.DEFAULT_DELAY) count_retries = _get_kwarg_val(kwargs, 'count_retries', settings.DEFAULT_COUNT_RETRIES) @@ -42,14 +37,12 @@ def wrapper(*args, **kwargs): class task(object): - def __init__(self, queue_name=None, max_retries=None, use_pickle=None): - # type: (str, int, bool) -> None + def __init__(self, queue_name: str = None, max_retries: int = None, use_pickle: bool = None): self.queue_name = queue_name self.max_retries = max_retries self.use_pickle = use_pickle - def __call__(self, *args, **kwargs): - # type: (tuple, dict) -> Any + def __call__(self, *args: tuple, **kwargs: dict) -> Any: func = args[0] func.retry_num = 0 func.delay = func_delay_decorator(func, self.queue_name, self.max_retries, self.use_pickle) diff --git a/eb_sqs/management/commands/process_queue.py b/eb_sqs/management/commands/process_queue.py index 4b02bff..15b3cde 100644 --- a/eb_sqs/management/commands/process_queue.py +++ b/eb_sqs/management/commands/process_queue.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import, unicode_literals - from django.core.management import BaseCommand, CommandError from eb_sqs.worker.service import WorkerService diff --git a/eb_sqs/settings.py b/eb_sqs/settings.py index 2cd00ef..754cf7b 100644 --- a/eb_sqs/settings.py +++ b/eb_sqs/settings.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import, unicode_literals - from django.conf import settings AWS_REGION = getattr(settings, 'EB_AWS_REGION', 'us-east-1') # type: str diff --git a/eb_sqs/test_settings.py b/eb_sqs/test_settings.py index 75a417c..77c579b 100644 --- a/eb_sqs/test_settings.py +++ b/eb_sqs/test_settings.py @@ -34,8 +34,6 @@ }, ] -ROOT_URLCONF = 'eb_sqs.urls' - LOGGING = { 'version': 1, 'disable_existing_loggers': False, diff --git a/eb_sqs/tests/tests_decorators.py b/eb_sqs/tests/tests_decorators.py index 39795ae..9794925 100644 --- a/eb_sqs/tests/tests_decorators.py +++ b/eb_sqs/tests/tests_decorators.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import, unicode_literals - from unittest import TestCase from mock import Mock @@ -11,19 +9,18 @@ @task() -def dummy_task(msg): - # type: (unicode) -> None +def dummy_task(msg: str): if not msg: raise Exception('No message') + @task(queue_name='CustomQueue') def dummy_task_custom_queue(): - # type: (unicode) -> None pass + @task() -def dummy_retry_task(msg): - # type: (unicode) -> None +def dummy_retry_task(msg: str): if dummy_retry_task.retry_num == 0: dummy_retry_task.retry() else: diff --git a/eb_sqs/tests/worker/tests_worker.py b/eb_sqs/tests/worker/tests_worker.py index 719a0d6..ddd59bd 100644 --- a/eb_sqs/tests/worker/tests_worker.py +++ b/eb_sqs/tests/worker/tests_worker.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import, unicode_literals - from unittest import TestCase from mock import Mock diff --git a/eb_sqs/tests/worker/tests_worker_task.py b/eb_sqs/tests/worker/tests_worker_task.py index 8d1968d..d6e9725 100644 --- a/eb_sqs/tests/worker/tests_worker_task.py +++ b/eb_sqs/tests/worker/tests_worker_task.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import, unicode_literals - import json from unittest import TestCase @@ -8,7 +6,6 @@ class TestObject(object): def __init__(self): - # type: () -> None super(TestObject, self).__init__() self.message = 'Test' diff --git a/eb_sqs/worker/commons.py b/eb_sqs/worker/commons.py index ef1b5b2..cb9074c 100644 --- a/eb_sqs/worker/commons.py +++ b/eb_sqs/worker/commons.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import, unicode_literals - from contextlib import contextmanager from django.db import reset_queries, close_old_connections @@ -7,7 +5,6 @@ @contextmanager def django_db_management(): - # type: () -> None reset_queries() close_old_connections() try: diff --git a/eb_sqs/worker/queue_client.py b/eb_sqs/worker/queue_client.py index 68991a2..09d0fdf 100644 --- a/eb_sqs/worker/queue_client.py +++ b/eb_sqs/worker/queue_client.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import, unicode_literals - from abc import ABCMeta, abstractmethod @@ -8,16 +6,13 @@ class QueueClientException(Exception): class QueueDoesNotExistException(QueueClientException): - def __init__(self, queue_name): - # type: (unicode) -> None + def __init__(self, queue_name: str): super(QueueDoesNotExistException, self).__init__() self.queue_name = queue_name -class QueueClient(object): - __metaclass__ = ABCMeta +class QueueClient(metaclass=ABCMeta): @abstractmethod - def add_message(self, queue_name, msg, delay): - # type: (unicode, unicode, int) -> None + def add_message(self, queue_name: str, msg: str, delay: int): pass diff --git a/eb_sqs/worker/service.py b/eb_sqs/worker/service.py index cfaa4c8..6d49002 100644 --- a/eb_sqs/worker/service.py +++ b/eb_sqs/worker/service.py @@ -1,11 +1,12 @@ -from __future__ import absolute_import, unicode_literals - import logging import signal from datetime import timedelta from time import sleep +from typing import Any import boto3 +from boto3.resources.base import ServiceResource + from botocore.config import Config from botocore.exceptions import ClientError import django.dispatch @@ -19,9 +20,9 @@ logger = logging.getLogger(__name__) -MESSAGES_RECEIVED = django.dispatch.Signal(providing_args=['messages']) -MESSAGES_PROCESSED = django.dispatch.Signal(providing_args=['messages']) -MESSAGES_DELETED = django.dispatch.Signal(providing_args=['messages']) +MESSAGES_RECEIVED = django.dispatch.Signal() +MESSAGES_PROCESSED = django.dispatch.Signal() +MESSAGES_DELETED = django.dispatch.Signal() class WorkerService(object): @@ -29,12 +30,10 @@ class WorkerService(object): _RECEIVE_COUNT_ATTRIBUTE = 'ApproximateReceiveCount' def __init__(self): - # type: () -> None self._exit_gracefully = False self._last_healthcheck_time = None - def process_queues(self, queue_names): - # type: (list) -> None + def process_queues(self, queue_names: list): signal.signal(signal.SIGTERM, self._exit_called) self.write_healthcheck_file() @@ -83,8 +82,7 @@ def process_queues(self, queue_names): else: self.process_messages(queues, worker, static_queues) - def process_messages(self, queues, worker, static_queues): - # type: (list, Worker, list) -> None + def process_messages(self, queues: list, worker: Worker, static_queues: list): for queue in queues: if self._exit_gracefully: @@ -100,8 +98,8 @@ def process_messages(self, queues, worker, static_queues): for msg in messages: self._execute_user_code(lambda: self._process_message(msg, worker)) msg_entries.append({ - 'Id': msg.message_id, - 'ReceiptHandle': msg.receipt_handle + 'Id': msg.message_id, + 'ReceiptHandle': msg.receipt_handle }) self._send_signal(MESSAGES_PROCESSED, messages=messages) @@ -112,18 +110,19 @@ def process_messages(self, queues, worker, static_queues): except ClientError as exc: error_code = exc.response.get('Error', {}).get('Code', None) if error_code == 'AWS.SimpleQueueService.NonExistentQueue' and queue not in static_queues: - logger.debug('[django-eb-sqs] Queue was already deleted {}: {}'.format(queue.url, exc), exc_info=True) + logger.debug('[django-eb-sqs] Queue was already deleted {}: {}'.format(queue.url, exc), + exc_info=True) else: logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=True) except Exception as exc: logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=True) - if timezone.now() - timedelta(seconds=settings.MIN_HEALTHCHECK_WRITE_PERIOD_S) > self._last_healthcheck_time: + if timezone.now() - timedelta( + seconds=settings.MIN_HEALTHCHECK_WRITE_PERIOD_S) > self._last_healthcheck_time: self.write_healthcheck_file() self._last_healthcheck_time = timezone.now() - def delete_messages(self, queue, msg_entries): - # type: (Queue, list) -> None + def delete_messages(self, queue, msg_entries: list): if len(msg_entries) > 0: response = queue.delete_messages(Entries=msg_entries) @@ -133,21 +132,18 @@ def delete_messages(self, queue, msg_entries): if num_failed > 0: logger.warning('[django-eb-sqs] Failed deleting {} messages: {}'.format(num_failed, failed)) - def poll_messages(self, queue): - # type: (Queue) -> list + def poll_messages(self, queue) -> list: return queue.receive_messages( MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES, WaitTimeSeconds=settings.WAIT_TIME_S, AttributeNames=[self._RECEIVE_COUNT_ATTRIBUTE] ) - def _send_signal(self, dispatch_signal, messages): - # type: (django.dispatch.Signal, list) -> None + def _send_signal(self, dispatch_signal: django.dispatch.Signal, messages: list): if dispatch_signal.has_listeners(sender=self.__class__): self._execute_user_code(lambda: dispatch_signal.send(sender=self.__class__, messages=messages)) - def _process_message(self, msg, worker): - # type: (Message, Worker) -> None + def _process_message(self, msg, worker: Worker): logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id)) try: receive_count = int(msg.attributes[self._RECEIVE_COUNT_ATTRIBUTE]) @@ -164,20 +160,17 @@ def _process_message(self, msg, worker): logger.warning('[django-eb-sqs] Handling message {} got error: {}'.format(msg.message_id, repr(exc))) @staticmethod - def _execute_user_code(function): - # type: (Any) -> None + def _execute_user_code(function: Any): try: with django_db_management(): function() except Exception as exc: logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=True) - def get_queues_by_names(self, sqs, queue_names): - # type: (ServiceResource, list) -> list + def get_queues_by_names(self, sqs: ServiceResource, queue_names: list) -> list: return [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names] - def get_queues_by_prefixes(self, sqs, prefixes): - # type: (ServiceResource, list) -> list + def get_queues_by_prefixes(self, sqs: ServiceResource, prefixes: list) -> list: queues = [] for prefix in prefixes: @@ -186,7 +179,6 @@ def get_queues_by_prefixes(self, sqs, prefixes): return queues def write_healthcheck_file(self): - # type: () -> None with open(settings.HEALTHCHECK_FILE_NAME, 'w') as file: file.write(timezone.now().isoformat()) diff --git a/eb_sqs/worker/sqs_worker_factory.py b/eb_sqs/worker/sqs_worker_factory.py index 3b9c32d..7172cf2 100644 --- a/eb_sqs/worker/sqs_worker_factory.py +++ b/eb_sqs/worker/sqs_worker_factory.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import, unicode_literals - from eb_sqs.aws.sqs_queue_client import SqsQueueClient from eb_sqs.worker.worker import Worker from eb_sqs.worker.worker_factory import WorkerFactory diff --git a/eb_sqs/worker/worker.py b/eb_sqs/worker/worker.py index 3ea9902..9b9e2ab 100644 --- a/eb_sqs/worker/worker.py +++ b/eb_sqs/worker/worker.py @@ -1,7 +1,6 @@ -from __future__ import absolute_import, unicode_literals - import logging import uuid +from typing import Any from eb_sqs import settings from eb_sqs.worker.queue_client import QueueDoesNotExistException, QueueClient, QueueClientException @@ -13,13 +12,11 @@ class Worker(object): - def __init__(self, queue_client): - # type: (QueueClient) -> None + def __init__(self, queue_client: QueueClient): super(Worker, self).__init__() self.queue_client = queue_client - def execute(self, msg): - # type: (unicode) -> Any + def execute(self, msg: str) -> Any: try: worker_task = WorkerTask.deserialize(msg) except Exception as ex: @@ -69,19 +66,19 @@ def execute(self, msg): raise ExecutionFailedException(worker_task.abs_func_name, ex) - def delay(self, group_id, queue_name, func, args, kwargs, max_retries, use_pickle, delay, execute_inline): - # type: (unicode, unicode, Any, tuple, dict, int, bool, int, bool) -> Any - worker_task = WorkerTask(str(uuid.uuid4()), group_id, queue_name, func, args, kwargs, max_retries, 0, None, use_pickle) + def delay(self, group_id: str, queue_name: str, func: Any, args: tuple, kwargs: dict, max_retries: int, use_pickle: bool, + delay: int, execute_inline: bool) -> Any: + worker_task = WorkerTask(str(uuid.uuid4()), group_id, queue_name, func, args, kwargs, max_retries, 0, None, + use_pickle) return self._enqueue_task(worker_task, delay, execute_inline, False, True) - def retry(self, worker_task, delay, execute_inline, count_retries): - # type: (WorkerTask, int, bool, bool) -> Any + def retry(self, worker_task: WorkerTask, delay: int, execute_inline: bool, count_retries: bool) -> Any: worker_task = worker_task.copy(settings.FORCE_SERIALIZATION) worker_task.retry_id = str(uuid.uuid4()) return self._enqueue_task(worker_task, delay, execute_inline, True, count_retries) - def _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retries): - # type: (WorkerTask, int, bool, bool, bool) -> Any + def _enqueue_task(self, worker_task: WorkerTask, delay: int, execute_inline: bool, is_retry: bool, + count_retries: bool) -> Any: try: if is_retry and count_retries: worker_task.retry += 1 @@ -116,7 +113,6 @@ def _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retr raise QueueException() @classmethod - def _execute_task(cls, worker_task): - # type: (WorkerTask) -> Any + def _execute_task(cls, worker_task: WorkerTask) -> Any: result = worker_task.execute() return result diff --git a/eb_sqs/worker/worker_exceptions.py b/eb_sqs/worker/worker_exceptions.py index c6b14e4..a0cf8c1 100644 --- a/eb_sqs/worker/worker_exceptions.py +++ b/eb_sqs/worker/worker_exceptions.py @@ -1,29 +1,23 @@ -from __future__ import absolute_import, unicode_literals - - class WorkerException(Exception): pass class InvalidMessageFormatException(WorkerException): - def __init__(self, msg, caught): - # type: (unicode, Exception) -> None + def __init__(self, msg: str, caught: Exception): super(InvalidMessageFormatException, self).__init__() self.msg = msg self.caught = caught class ExecutionFailedException(WorkerException): - def __init__(self, task_name, caught): - # type: (unicode, Exception) -> None + def __init__(self, task_name: str, caught: Exception): super(ExecutionFailedException, self).__init__() self.task_name = task_name self.caught = caught class MaxRetriesReachedException(WorkerException): - def __init__(self, retries): - # type: (int) -> None + def __init__(self, retries: int): super(MaxRetriesReachedException, self).__init__() self.retries = retries @@ -33,7 +27,6 @@ class QueueException(WorkerException): class InvalidQueueException(QueueException): - def __init__(self, queue_name): - # type: (unicode) -> None + def __init__(self, queue_name: str): super(InvalidQueueException, self).__init__() self.queue_name = queue_name diff --git a/eb_sqs/worker/worker_factory.py b/eb_sqs/worker/worker_factory.py index c7009ef..9afb1f8 100644 --- a/eb_sqs/worker/worker_factory.py +++ b/eb_sqs/worker/worker_factory.py @@ -1,26 +1,16 @@ -from __future__ import absolute_import, unicode_literals - from abc import ABCMeta, abstractmethod from eb_sqs import settings from eb_sqs.worker.worker import Worker -class WorkerFactory(object): - __metaclass__ = ABCMeta - - def __init__(self): - # type: () -> None - super(WorkerFactory, self).__init__() - +class WorkerFactory(metaclass=ABCMeta): @abstractmethod - def create(self): - # type: () -> Worker + def create(self) -> Worker: pass @staticmethod def default(): - # type: () -> WorkerFactory if not settings.WORKER_FACTORY: from eb_sqs.worker.sqs_worker_factory import SqsWorkerFactory return SqsWorkerFactory() diff --git a/eb_sqs/worker/worker_task.py b/eb_sqs/worker/worker_task.py index a99ec3b..95d4d5f 100644 --- a/eb_sqs/worker/worker_task.py +++ b/eb_sqs/worker/worker_task.py @@ -1,9 +1,8 @@ -from __future__ import absolute_import, unicode_literals - import base64 import importlib import json import uuid +from typing import Any from eb_sqs import settings @@ -14,8 +13,7 @@ class WorkerTask(object): - def __init__(self, id, group_id, queue, func, args, kwargs, max_retries, retry, retry_id, use_pickle): - # type: (str, unicode, unicode, Any, tuple, dict, int, int, unicode, bool) -> None + def __init__(self, id: str, group_id: str, queue: str, func: Any, args: tuple, kwargs: dict, max_retries: int, retry: int, retry_id: str, use_pickle: bool): super(WorkerTask, self).__init__() self.id = id self.group_id = group_id @@ -30,35 +28,32 @@ def __init__(self, id, group_id, queue, func, args, kwargs, max_retries, retry, self.abs_func_name = '{}.{}'.format(self.func.__module__, self.func.__name__) - def execute(self): - # type: () -> Any + def execute(self) -> Any: from eb_sqs.decorators import func_retry_decorator self.func.retry_num = self.retry self.func.retry = func_retry_decorator(worker_task=self) return self.func(*self.args, **self.kwargs) - def serialize(self): - # type: () -> unicode + def serialize(self) -> str: args = WorkerTask._pickle_args(self.args) if self.use_pickle else self.args kwargs = WorkerTask._pickle_args(self.kwargs) if self.use_pickle else self.kwargs task = { - 'id': self.id, - 'groupId': self.group_id, - 'queue': self.queue, - 'func': self.abs_func_name, - 'args': args, - 'kwargs': kwargs, - 'maxRetries': self.max_retries, - 'retry': self.retry, - 'retryId': self.retry_id, - 'pickle': self.use_pickle, - } + 'id': self.id, + 'groupId': self.group_id, + 'queue': self.queue, + 'func': self.abs_func_name, + 'args': args, + 'kwargs': kwargs, + 'maxRetries': self.max_retries, + 'retry': self.retry, + 'retryId': self.retry_id, + 'pickle': self.use_pickle, + } return json.dumps(task) - def copy(self, use_serialization): - # type: (bool) -> WorkerTask + def copy(self, use_serialization: bool): if use_serialization: return WorkerTask.deserialize(self.serialize()) else: @@ -76,13 +71,11 @@ def copy(self, use_serialization): ) @staticmethod - def _pickle_args(args): - # type: (Any) -> unicode + def _pickle_args(args: Any) -> str: return base64.b64encode(pickle.dumps(args, pickle.HIGHEST_PROTOCOL)).decode('utf-8') @staticmethod - def deserialize(msg): - # type: (unicode) -> WorkerTask + def deserialize(msg: str): task = json.loads(msg) id = task.get('id', str(uuid.uuid4())) @@ -110,6 +103,5 @@ def deserialize(msg): return WorkerTask(id, group_id, queue, func, args, kwargs, max_retries, retry, retry_id, use_pickle) @staticmethod - def _unpickle_args(args): - # type: (unicode) -> dict + def _unpickle_args(args: str) -> dict: return pickle.loads(base64.b64decode(args.encode('utf-8'))) diff --git a/setup.py b/setup.py index 248ebc6..3849fb9 100644 --- a/setup.py +++ b/setup.py @@ -22,8 +22,7 @@ ], classifiers=[ 'Intended Audience :: Developers', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3.9', 'Topic :: Software Development', 'License :: OSI Approved :: MIT License', 'Operating System :: OS Independent',