Skip to content

Commit

Permalink
Handle SDK reconnection by first checking if the queue exists; Includ…
Browse files Browse the repository at this point in the history
…e pamqp in pyproject.toml
  • Loading branch information
cwang39403 committed Sep 2, 2024
1 parent 3691c62 commit 2ddfcc9
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 10 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ classifiers = [

dependencies = [
"aio-pika ~= 9.4.2",
"pamqp ~= 3.3.0",
"omotes-sdk-protocol ~= 0.1.1",
"celery ~= 5.3.6",
"typing-extensions ~= 4.11.0",
Expand Down
23 changes: 23 additions & 0 deletions src/omotes_sdk/internal/common/broker_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
AbstractIncomingMessage,
AbstractExchange,
)
from aio_pika.exceptions import ChannelClosed
from pamqp.common import Arguments

from omotes_sdk.config import RabbitMQConfig
Expand Down Expand Up @@ -361,6 +362,19 @@ async def _declare_queue_and_add_subscription(
)
self._queue_subscription_tasks[queue_name] = queue_subscription_task

async def _queue_exists(self, queue_name: str) -> bool:
"""Check if the queue exists.
:param queue_name: Name of the queue to be checked.
"""
try:
await self._channel.get_queue(queue_name, ensure=True)
logger.info("The %s queue exists", queue_name)
return True
except ChannelClosed as err:
logger.warning(err)
return False

async def _remove_queue_subscription(self, queue_name: str) -> None:
"""Remove subscription from queue and delete the queue if one exists.
Expand Down Expand Up @@ -517,6 +531,15 @@ def declare_queue_and_add_subscription(
self._loop,
).result()

def queue_exists(self, queue_name: str) -> bool:
"""Check if the queue exists.
:param queue_name: Name of the queue to be checked.
"""
return asyncio.run_coroutine_threadsafe(
self._queue_exists(queue_name=queue_name), self._loop
).result()

def remove_queue_subscription(self, queue_name: str) -> None:
"""Remove subscription from queue and delete the queue if one exists.
Expand Down
48 changes: 38 additions & 10 deletions src/omotes_sdk/omotes_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,6 @@ class OmotesInterface:

JOB_RESULT_MESSAGE_TTL: timedelta = timedelta(hours=48)
"""Default value of job result message TTL."""
JOB_QUEUES_REMOVAL_BUFFER: timedelta = timedelta(seconds=30)
"""To ensure job result, progress, and status queues are removed after this time buffer."""

def __init__(
self,
Expand Down Expand Up @@ -180,7 +178,8 @@ def connect_to_submitted_job(
callback_on_progress_update: Optional[Callable[[Job, JobProgressUpdate], None]],
callback_on_status_update: Optional[Callable[[Job, JobStatusUpdate], None]],
auto_disconnect_on_result: bool,
auto_dead_letter_after_ttl: Optional[timedelta] = JOB_RESULT_MESSAGE_TTL
auto_dead_letter_after_ttl: Optional[timedelta] = JOB_RESULT_MESSAGE_TTL,
reconnect: bool = True
) -> None:
"""(Re)connect to the running job.
Expand All @@ -200,17 +199,46 @@ def connect_to_submitted_job(
Set to `None` to turn off auto dead letter and clean up, but be aware this may lead to
messages and queues to be stored in RabbitMQ indefinitely
(which uses up memory & disk space).
:param reconnect: When True, first check the job queues status and raise an error if not
exist. Default to True.
"""
job_results_queue_name = OmotesQueueNames.job_results_queue_name(job.id)
job_progress_queue_name = OmotesQueueNames.job_progress_queue_name(job.id)
job_status_queue_name = OmotesQueueNames.job_status_queue_name(job.id)

if reconnect:
logger.info("Reconnect to the submitted job %s is set to True. "
+ "Checking job queues status...", job.id)
if not self.broker_if.queue_exists(job_results_queue_name):
raise RuntimeError(
f"The {job_results_queue_name} queue does not exist or is removed. "
"Abort reconnecting to the queue."
)
if (callback_on_progress_update
and not self.broker_if.queue_exists(job_progress_queue_name)):
raise RuntimeError(
f"The {job_progress_queue_name} queue does not exist or is removed. "
"Abort reconnecting to the queue."
)
if (callback_on_status_update
and not self.broker_if.queue_exists(job_status_queue_name)):
raise RuntimeError(
f"The {job_status_queue_name} queue does not exist or is removed. "
"Abort reconnecting to the queue."
)

if auto_disconnect_on_result:
logger.info("Connecting to update for job %s with auto disconnect on result", job.id)
auto_disconnect_handler = self._autodelete_progres_status_queues_on_result
else:
logger.info("Connecting to update for job %s and expect manual disconnect", job.id)
auto_disconnect_handler = None

# TODO: handle reconnection after the message is dead lettered but queue still exists.

if auto_dead_letter_after_ttl is not None:
message_ttl = auto_dead_letter_after_ttl
queue_ttl = auto_dead_letter_after_ttl + self.JOB_QUEUES_REMOVAL_BUFFER
queue_ttl = auto_dead_letter_after_ttl * 2
logger.info("Auto dead letter and cleanup on error after TTL is set. "
+ "The leftover job result message will be dead lettered after %s, "
+ "and leftover job queues will be discarded after %s.",
Expand All @@ -235,10 +263,8 @@ def connect_to_submitted_job(
auto_disconnect_handler,
)

# TODO: raise an error if job queues are not found when the client reconnects.

self.broker_if.declare_queue_and_add_subscription(
queue_name=OmotesQueueNames.job_results_queue_name(job.id),
queue_name=job_results_queue_name,
callback_on_message=callback_handler.callback_on_finished_wrapped,
queue_type=AMQPQueueType.DURABLE,
exchange_name=OmotesQueueNames.omotes_exchange_name(),
Expand All @@ -247,15 +273,15 @@ def connect_to_submitted_job(
)
if callback_on_progress_update:
self.broker_if.declare_queue_and_add_subscription(
queue_name=OmotesQueueNames.job_progress_queue_name(job.id),
queue_name=job_progress_queue_name,
callback_on_message=callback_handler.callback_on_progress_update_wrapped,
queue_type=AMQPQueueType.DURABLE,
exchange_name=OmotesQueueNames.omotes_exchange_name(),
queue_message_ttl=job_progress_status_queue_ttl
)
if callback_on_status_update:
self.broker_if.declare_queue_and_add_subscription(
queue_name=OmotesQueueNames.job_status_queue_name(job.id),
queue_name=job_status_queue_name,
callback_on_message=callback_handler.callback_on_status_update_wrapped,
queue_type=AMQPQueueType.DURABLE,
exchange_name=OmotesQueueNames.omotes_exchange_name(),
Expand Down Expand Up @@ -307,14 +333,16 @@ def submit_job(
raise UnknownWorkflowException()

job = Job(id=uuid.uuid4(), workflow_type=workflow_type)
reconnect = False
logger.info("Submitting job %s", job.id)
self.connect_to_submitted_job(
job,
callback_on_finished,
callback_on_progress_update,
callback_on_status_update,
auto_disconnect_on_result,
auto_dead_letter_after_ttl
auto_dead_letter_after_ttl,
reconnect
)

if job_timeout is not None:
Expand Down

0 comments on commit 2ddfcc9

Please sign in to comment.