From 2ddfcc981500a4940a0317fbc69ea854c635dfeb Mon Sep 17 00:00:00 2001 From: Cheng-Kai Wang Date: Mon, 2 Sep 2024 09:19:43 +0200 Subject: [PATCH] Handle SDK reconnection by first checking if the queue exists; Include pamqp in pyproject.toml --- pyproject.toml | 1 + .../internal/common/broker_interface.py | 23 +++++++++ src/omotes_sdk/omotes_interface.py | 48 +++++++++++++++---- 3 files changed, 62 insertions(+), 10 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index b5b0c5b..3b57eb8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ classifiers = [ dependencies = [ "aio-pika ~= 9.4.2", + "pamqp ~= 3.3.0", "omotes-sdk-protocol ~= 0.1.1", "celery ~= 5.3.6", "typing-extensions ~= 4.11.0", diff --git a/src/omotes_sdk/internal/common/broker_interface.py b/src/omotes_sdk/internal/common/broker_interface.py index 1d05349..40f0516 100644 --- a/src/omotes_sdk/internal/common/broker_interface.py +++ b/src/omotes_sdk/internal/common/broker_interface.py @@ -18,6 +18,7 @@ AbstractIncomingMessage, AbstractExchange, ) +from aio_pika.exceptions import ChannelClosed from pamqp.common import Arguments from omotes_sdk.config import RabbitMQConfig @@ -361,6 +362,19 @@ async def _declare_queue_and_add_subscription( ) self._queue_subscription_tasks[queue_name] = queue_subscription_task + async def _queue_exists(self, queue_name: str) -> bool: + """Check if the queue exists. + + :param queue_name: Name of the queue to be checked. + """ + try: + await self._channel.get_queue(queue_name, ensure=True) + logger.info("The %s queue exists", queue_name) + return True + except ChannelClosed as err: + logger.warning(err) + return False + async def _remove_queue_subscription(self, queue_name: str) -> None: """Remove subscription from queue and delete the queue if one exists. @@ -517,6 +531,15 @@ def declare_queue_and_add_subscription( self._loop, ).result() + def queue_exists(self, queue_name: str) -> bool: + """Check if the queue exists. + + :param queue_name: Name of the queue to be checked. + """ + return asyncio.run_coroutine_threadsafe( + self._queue_exists(queue_name=queue_name), self._loop + ).result() + def remove_queue_subscription(self, queue_name: str) -> None: """Remove subscription from queue and delete the queue if one exists. diff --git a/src/omotes_sdk/omotes_interface.py b/src/omotes_sdk/omotes_interface.py index 0441f42..d0eabc2 100644 --- a/src/omotes_sdk/omotes_interface.py +++ b/src/omotes_sdk/omotes_interface.py @@ -107,8 +107,6 @@ class OmotesInterface: JOB_RESULT_MESSAGE_TTL: timedelta = timedelta(hours=48) """Default value of job result message TTL.""" - JOB_QUEUES_REMOVAL_BUFFER: timedelta = timedelta(seconds=30) - """To ensure job result, progress, and status queues are removed after this time buffer.""" def __init__( self, @@ -180,7 +178,8 @@ def connect_to_submitted_job( callback_on_progress_update: Optional[Callable[[Job, JobProgressUpdate], None]], callback_on_status_update: Optional[Callable[[Job, JobStatusUpdate], None]], auto_disconnect_on_result: bool, - auto_dead_letter_after_ttl: Optional[timedelta] = JOB_RESULT_MESSAGE_TTL + auto_dead_letter_after_ttl: Optional[timedelta] = JOB_RESULT_MESSAGE_TTL, + reconnect: bool = True ) -> None: """(Re)connect to the running job. @@ -200,7 +199,34 @@ def connect_to_submitted_job( Set to `None` to turn off auto dead letter and clean up, but be aware this may lead to messages and queues to be stored in RabbitMQ indefinitely (which uses up memory & disk space). + :param reconnect: When True, first check the job queues status and raise an error if not + exist. Default to True. """ + job_results_queue_name = OmotesQueueNames.job_results_queue_name(job.id) + job_progress_queue_name = OmotesQueueNames.job_progress_queue_name(job.id) + job_status_queue_name = OmotesQueueNames.job_status_queue_name(job.id) + + if reconnect: + logger.info("Reconnect to the submitted job %s is set to True. " + + "Checking job queues status...", job.id) + if not self.broker_if.queue_exists(job_results_queue_name): + raise RuntimeError( + f"The {job_results_queue_name} queue does not exist or is removed. " + "Abort reconnecting to the queue." + ) + if (callback_on_progress_update + and not self.broker_if.queue_exists(job_progress_queue_name)): + raise RuntimeError( + f"The {job_progress_queue_name} queue does not exist or is removed. " + "Abort reconnecting to the queue." + ) + if (callback_on_status_update + and not self.broker_if.queue_exists(job_status_queue_name)): + raise RuntimeError( + f"The {job_status_queue_name} queue does not exist or is removed. " + "Abort reconnecting to the queue." + ) + if auto_disconnect_on_result: logger.info("Connecting to update for job %s with auto disconnect on result", job.id) auto_disconnect_handler = self._autodelete_progres_status_queues_on_result @@ -208,9 +234,11 @@ def connect_to_submitted_job( logger.info("Connecting to update for job %s and expect manual disconnect", job.id) auto_disconnect_handler = None + # TODO: handle reconnection after the message is dead lettered but queue still exists. + if auto_dead_letter_after_ttl is not None: message_ttl = auto_dead_letter_after_ttl - queue_ttl = auto_dead_letter_after_ttl + self.JOB_QUEUES_REMOVAL_BUFFER + queue_ttl = auto_dead_letter_after_ttl * 2 logger.info("Auto dead letter and cleanup on error after TTL is set. " + "The leftover job result message will be dead lettered after %s, " + "and leftover job queues will be discarded after %s.", @@ -235,10 +263,8 @@ def connect_to_submitted_job( auto_disconnect_handler, ) - # TODO: raise an error if job queues are not found when the client reconnects. - self.broker_if.declare_queue_and_add_subscription( - queue_name=OmotesQueueNames.job_results_queue_name(job.id), + queue_name=job_results_queue_name, callback_on_message=callback_handler.callback_on_finished_wrapped, queue_type=AMQPQueueType.DURABLE, exchange_name=OmotesQueueNames.omotes_exchange_name(), @@ -247,7 +273,7 @@ def connect_to_submitted_job( ) if callback_on_progress_update: self.broker_if.declare_queue_and_add_subscription( - queue_name=OmotesQueueNames.job_progress_queue_name(job.id), + queue_name=job_progress_queue_name, callback_on_message=callback_handler.callback_on_progress_update_wrapped, queue_type=AMQPQueueType.DURABLE, exchange_name=OmotesQueueNames.omotes_exchange_name(), @@ -255,7 +281,7 @@ def connect_to_submitted_job( ) if callback_on_status_update: self.broker_if.declare_queue_and_add_subscription( - queue_name=OmotesQueueNames.job_status_queue_name(job.id), + queue_name=job_status_queue_name, callback_on_message=callback_handler.callback_on_status_update_wrapped, queue_type=AMQPQueueType.DURABLE, exchange_name=OmotesQueueNames.omotes_exchange_name(), @@ -307,6 +333,7 @@ def submit_job( raise UnknownWorkflowException() job = Job(id=uuid.uuid4(), workflow_type=workflow_type) + reconnect = False logger.info("Submitting job %s", job.id) self.connect_to_submitted_job( job, @@ -314,7 +341,8 @@ def submit_job( callback_on_progress_update, callback_on_status_update, auto_disconnect_on_result, - auto_dead_letter_after_ttl + auto_dead_letter_after_ttl, + reconnect ) if job_timeout is not None: