From a4a681abc22bb521af64e6b2c9fbb3a1e85cd788 Mon Sep 17 00:00:00 2001 From: Danut Ilisei Date: Tue, 28 May 2024 14:03:41 +0200 Subject: [PATCH] fix: queue purge error --- pantos/servicenode/business/plugins.py | 2 ++ pantos/servicenode/celery.py | 9 ++++++++- tests/business/test_plugins.py | 10 ++++++++-- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/pantos/servicenode/business/plugins.py b/pantos/servicenode/business/plugins.py index 9a68f44..9f9545a 100644 --- a/pantos/servicenode/business/plugins.py +++ b/pantos/servicenode/business/plugins.py @@ -8,6 +8,7 @@ from pantos.servicenode.blockchains.factory import get_blockchain_client from pantos.servicenode.business.base import Interactor from pantos.servicenode.business.base import InteractorError +from pantos.servicenode.configuration import get_blockchain_config from pantos.servicenode.configuration import get_plugin_config from pantos.servicenode.database.access import replace_bids from pantos.servicenode.plugins import get_bid_plugin @@ -116,5 +117,6 @@ def execute_bid_plugin(source_blockchain_id: int): except Exception: _logger.critical('unable to replace the bids', exc_info=True) finally: + assert get_blockchain_config(source_blockchain)['active'] execute_bid_plugin.apply_async(args=[source_blockchain_id], countdown=delay) diff --git a/pantos/servicenode/celery.py b/pantos/servicenode/celery.py index 1f7ada0..ed69625 100644 --- a/pantos/servicenode/celery.py +++ b/pantos/servicenode/celery.py @@ -5,6 +5,7 @@ import pathlib import sys +import amqp # type: ignore import celery # type: ignore from pantos.common.logging import LogFile from pantos.common.logging import LogFormat @@ -17,6 +18,9 @@ _TRANSFERS_QUEUE_NAME = 'transfers' _BIDS_QUEUE_NAME = 'bids' +_logger = logging.getLogger(__name__) +"""Logger for this module.""" + def is_celery_worker_process() -> bool: return (len(sys.argv) > 0 and sys.argv[0].endswith('celery') @@ -53,7 +57,10 @@ def is_celery_worker_process() -> bool: if is_celery_worker_process(): # pragma: no cover # purge the bids queue at startup with celery_app.connection_for_write() as connection: - connection.default_channel.queue_purge(_BIDS_QUEUE_NAME) + try: + connection.default_channel.queue_purge(_BIDS_QUEUE_NAME) + except amqp.exceptions.NotFound as error: + _logger.warning(str(error)) initialize_plugins(start_worker=True) diff --git a/tests/business/test_plugins.py b/tests/business/test_plugins.py index dac71c8..45f5aeb 100644 --- a/tests/business/test_plugins.py +++ b/tests/business/test_plugins.py @@ -23,10 +23,13 @@ def get_bids(self, source_blockchain_id, destination_blockchain_id): ], 10 +@unittest.mock.patch('pantos.servicenode.business.plugins.' + 'get_blockchain_config') @unittest.mock.patch('pantos.servicenode.business.plugins.execute_bid_plugin.' 'apply_async') @unittest.mock.patch.object(BidPluginInteractor, 'replace_bids') -def test_execute_bid_plugin_correct(mocked_replace_bids, mocked_task): +def test_execute_bid_plugin_correct(mocked_replace_bids, mocked_task, + mocked_get_blockchain_config): mocked_replace_bids.return_value = 1 task = execute_bid_plugin.s(Blockchain.ETHEREUM.value).apply() @@ -38,10 +41,13 @@ def test_execute_bid_plugin_correct(mocked_replace_bids, mocked_task): countdown=1) +@unittest.mock.patch('pantos.servicenode.business.plugins.' + 'get_blockchain_config') @unittest.mock.patch('pantos.servicenode.business.plugins.execute_bid_plugin.' 'apply_async') @unittest.mock.patch.object(BidPluginInteractor, 'replace_bids') -def test_execute_bid_plugin_interactor_error(mocked_replace_bids, mocked_task): +def test_execute_bid_plugin_interactor_error(mocked_replace_bids, mocked_task, + mocked_get_blockchain_config): mocked_replace_bids.side_effect = Exception() task = execute_bid_plugin.s(Blockchain.ETHEREUM.value).apply()