Skip to content

Commit

Permalink
fix: queue purge error
Browse files Browse the repository at this point in the history
  • Loading branch information
danut13 committed May 28, 2024
1 parent efcacf0 commit a4a681a
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 3 deletions.
2 changes: 2 additions & 0 deletions pantos/servicenode/business/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pantos.servicenode.blockchains.factory import get_blockchain_client
from pantos.servicenode.business.base import Interactor
from pantos.servicenode.business.base import InteractorError
from pantos.servicenode.configuration import get_blockchain_config
from pantos.servicenode.configuration import get_plugin_config
from pantos.servicenode.database.access import replace_bids
from pantos.servicenode.plugins import get_bid_plugin
Expand Down Expand Up @@ -116,5 +117,6 @@ def execute_bid_plugin(source_blockchain_id: int):
except Exception:
_logger.critical('unable to replace the bids', exc_info=True)
finally:
assert get_blockchain_config(source_blockchain)['active']
execute_bid_plugin.apply_async(args=[source_blockchain_id],
countdown=delay)
9 changes: 8 additions & 1 deletion pantos/servicenode/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pathlib
import sys

import amqp # type: ignore
import celery # type: ignore
from pantos.common.logging import LogFile
from pantos.common.logging import LogFormat
Expand All @@ -17,6 +18,9 @@
_TRANSFERS_QUEUE_NAME = 'transfers'
_BIDS_QUEUE_NAME = 'bids'

_logger = logging.getLogger(__name__)
"""Logger for this module."""


def is_celery_worker_process() -> bool:
return (len(sys.argv) > 0 and sys.argv[0].endswith('celery')
Expand Down Expand Up @@ -53,7 +57,10 @@ def is_celery_worker_process() -> bool:
if is_celery_worker_process(): # pragma: no cover
# purge the bids queue at startup
with celery_app.connection_for_write() as connection:
connection.default_channel.queue_purge(_BIDS_QUEUE_NAME)
try:
connection.default_channel.queue_purge(_BIDS_QUEUE_NAME)
except amqp.exceptions.NotFound as error:
_logger.warning(str(error))
initialize_plugins(start_worker=True)


Expand Down
10 changes: 8 additions & 2 deletions tests/business/test_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ def get_bids(self, source_blockchain_id, destination_blockchain_id):
], 10


@unittest.mock.patch('pantos.servicenode.business.plugins.'
'get_blockchain_config')
@unittest.mock.patch('pantos.servicenode.business.plugins.execute_bid_plugin.'
'apply_async')
@unittest.mock.patch.object(BidPluginInteractor, 'replace_bids')
def test_execute_bid_plugin_correct(mocked_replace_bids, mocked_task):
def test_execute_bid_plugin_correct(mocked_replace_bids, mocked_task,
mocked_get_blockchain_config):
mocked_replace_bids.return_value = 1

task = execute_bid_plugin.s(Blockchain.ETHEREUM.value).apply()
Expand All @@ -38,10 +41,13 @@ def test_execute_bid_plugin_correct(mocked_replace_bids, mocked_task):
countdown=1)


@unittest.mock.patch('pantos.servicenode.business.plugins.'
'get_blockchain_config')
@unittest.mock.patch('pantos.servicenode.business.plugins.execute_bid_plugin.'
'apply_async')
@unittest.mock.patch.object(BidPluginInteractor, 'replace_bids')
def test_execute_bid_plugin_interactor_error(mocked_replace_bids, mocked_task):
def test_execute_bid_plugin_interactor_error(mocked_replace_bids, mocked_task,
mocked_get_blockchain_config):
mocked_replace_bids.side_effect = Exception()

task = execute_bid_plugin.s(Blockchain.ETHEREUM.value).apply()
Expand Down

0 comments on commit a4a681a

Please sign in to comment.