Skip to content

Commit

Permalink
Merge pull request #80 from Project-OMOTES/67-refactor-implementation…
Browse files Browse the repository at this point in the history
…-of-left-over-job-queues-clean-up

67 refactor implementation of left over job queues clean up
  • Loading branch information
cwang39403 authored Oct 28, 2024
2 parents 1894998 + 6b8776f commit 6a91e56
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 164 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ requires = [
enabled = true

[tool.pytest.ini_options]
addopts = "--cov=omotes_sdk --cov-report html --cov-report term-missing --cov-fail-under 63"
addopts = "--cov=omotes_sdk --cov-report html --cov-report term-missing --cov-fail-under 62"

[tool.coverage.run]
source = ["src"]
Expand Down
54 changes: 16 additions & 38 deletions src/omotes_sdk/internal/common/broker_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,24 +117,13 @@ def to_argument(self) -> AioPikaQueueTypeArguments:


@dataclass()
class QueueMessageTTLArguments():
class QueueTTLArguments():
"""Construct additional time-to-live arguments when declaring a queue."""

queue_ttl: Optional[timedelta] = None
"""Expires and deletes the queue after a period of time when it is not used.
"""Expires and deletes the queue after a period of time when it is unused.
The timedelta must be convertible into a positive integer.
Ref: https://www.rabbitmq.com/docs/ttl#queue-ttl"""
message_ttl: Optional[timedelta] = None
"""Expires and deletes the message within the queue after the defined TTL.
The timedelta must be convertible into a non-negative integer.
Ref: https://www.rabbitmq.com/docs/ttl#per-queue-message-ttl"""
dead_letter_routing_key: Optional[str] = None
"""When specified, the expired message is republished to the designated dead letter queue.
If not set, the message's own routing key is used.
Ref: https://www.rabbitmq.com/docs/dlx#routing"""
dead_letter_exchange: Optional[str] = None
"""Dead letter exchange name.
Ref: https://www.rabbitmq.com/docs/dlx"""

def to_argument(self) -> Arguments:
"""Convert the time-to-live variables to the aio-pika `declare_queue` keyword arguments.
Expand All @@ -150,18 +139,7 @@ def to_argument(self) -> Arguments:
raise ValueError("queue_ttl must be a positive value, "
+ f"{self.queue_ttl} received.")
arguments["x-expires"] = int(self.queue_ttl.total_seconds() * 1000)
if self.message_ttl is not None:
if self.message_ttl < timedelta(0):
raise ValueError("message_ttl can not be a negative value, "
+ f"{self.message_ttl} received.")
if self.queue_ttl is not None and self.message_ttl > self.queue_ttl:
# Raise an error as it serves no purpose.
raise ValueError("message_ttl shall be smaller or equal to queue_ttl.")
arguments["x-message-ttl"] = int(self.message_ttl.total_seconds() * 1000)
if self.dead_letter_routing_key is not None:
arguments["x-dead-letter-routing-key"] = str(self.dead_letter_routing_key)
if self.dead_letter_exchange is not None:
arguments["x-dead-letter-exchange"] = str(self.dead_letter_exchange)

return arguments


Expand Down Expand Up @@ -274,7 +252,7 @@ async def _declare_queue(
queue_type: AMQPQueueType,
bind_to_routing_key: Optional[str] = None,
exchange_name: Optional[str] = None,
queue_message_ttl: Optional[QueueMessageTTLArguments] = None
queue_ttl: Optional[QueueTTLArguments] = None
) -> AbstractQueue:
"""Declare an AMQP queue.
Expand All @@ -284,16 +262,16 @@ async def _declare_queue(
key of the queue name. If none, the queue is only bound to the name of the queue.
If not none, then the exchange_name must be set as well.
:param exchange_name: Name of the exchange on which the messages will be published.
:param queue_message_ttl: Additional arguments to specify queue or message TTL.
:param queue_ttl: Additional queue TTL arguments.
"""
if bind_to_routing_key is not None and exchange_name is None:
raise RuntimeError(
f"Routing key for binding was set to {bind_to_routing_key} but no "
f"exchange name was provided."
)

if queue_message_ttl is not None:
ttl_arguments = queue_message_ttl.to_argument()
if queue_ttl is not None:
ttl_arguments = queue_ttl.to_argument()
else:
ttl_arguments = None

Expand Down Expand Up @@ -324,7 +302,7 @@ async def _declare_queue_and_add_subscription(
bind_to_routing_key: Optional[str] = None,
exchange_name: Optional[str] = None,
delete_after_messages: Optional[int] = None,
queue_message_ttl: Optional[QueueMessageTTLArguments] = None
queue_ttl: Optional[QueueTTLArguments] = None
) -> None:
"""Declare an AMQP queue and subscribe to the messages.
Expand All @@ -338,7 +316,7 @@ async def _declare_queue_and_add_subscription(
:param exchange_name: Name of the exchange on which the messages will be published.
:param delete_after_messages: Delete the subscription & queue after this limit of messages
have been successfully processed.
:param queue_message_ttl: Additional arguments to specify queue or message TTL.
:param queue_ttl: Additional queue TTL arguments.
"""
if queue_name in self._queue_subscription_consumer_by_name:
logger.error(
Expand All @@ -348,7 +326,7 @@ async def _declare_queue_and_add_subscription(
raise RuntimeError(f"Queue subscription for {queue_name} already exists.")

queue = await self._declare_queue(
queue_name, queue_type, bind_to_routing_key, exchange_name, queue_message_ttl
queue_name, queue_type, bind_to_routing_key, exchange_name, queue_ttl
)

queue_consumer = QueueSubscriptionConsumer(
Expand Down Expand Up @@ -472,7 +450,7 @@ def declare_queue(
queue_type: AMQPQueueType,
bind_to_routing_key: Optional[str] = None,
exchange_name: Optional[str] = None,
queue_message_ttl: Optional[QueueMessageTTLArguments] = None
queue_ttl: Optional[QueueTTLArguments] = None
) -> None:
"""Declare an AMQP queue.
Expand All @@ -482,15 +460,15 @@ def declare_queue(
key of the queue name. If none, the queue is only bound to the name of the queue.
If not none, then the exchange_name must be set as well.
:param exchange_name: Name of the exchange on which the messages will be published.
:param queue_message_ttl: Additional arguments to specify queue or message TTL.
:param queue_ttl: Additional queue TTL arguments.
"""
asyncio.run_coroutine_threadsafe(
self._declare_queue(
queue_name=queue_name,
queue_type=queue_type,
bind_to_routing_key=bind_to_routing_key,
exchange_name=exchange_name,
queue_message_ttl=queue_message_ttl,
queue_ttl=queue_ttl,
),
self._loop,
).result()
Expand All @@ -503,7 +481,7 @@ def declare_queue_and_add_subscription(
bind_to_routing_key: Optional[str] = None,
exchange_name: Optional[str] = None,
delete_after_messages: Optional[int] = None,
queue_message_ttl: Optional[QueueMessageTTLArguments] = None
queue_ttl: Optional[QueueTTLArguments] = None
) -> None:
"""Declare an AMQP queue and subscribe to the messages.
Expand All @@ -516,7 +494,7 @@ def declare_queue_and_add_subscription(
:param exchange_name: Name of the exchange on which the messages will be published.
:param delete_after_messages: Delete the subscription & queue after this limit of messages
have been successfully processed.
:param queue_message_ttl: Additional arguments to specify queue or message TTL.
:param queue_ttl: Additional queue TTL arguments.
"""
asyncio.run_coroutine_threadsafe(
self._declare_queue_and_add_subscription(
Expand All @@ -526,7 +504,7 @@ def declare_queue_and_add_subscription(
bind_to_routing_key=bind_to_routing_key,
exchange_name=exchange_name,
delete_after_messages=delete_after_messages,
queue_message_ttl=queue_message_ttl,
queue_ttl=queue_ttl,
),
self._loop,
).result()
Expand Down
65 changes: 26 additions & 39 deletions src/omotes_sdk/omotes_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from omotes_sdk.internal.common.broker_interface import (
BrokerInterface,
AMQPQueueType,
QueueMessageTTLArguments,
QueueTTLArguments,
)
from omotes_sdk.config import RabbitMQConfig
from omotes_sdk_protocol.job_pb2 import (
Expand Down Expand Up @@ -108,8 +108,8 @@ class OmotesInterface:
"""How long the SDK should wait for the first reply when requesting the current workflow
definitions from the orchestrator."""

JOB_RESULT_MESSAGE_TTL: timedelta = timedelta(hours=48)
"""Default value of job result message TTL."""
JOB_QUEUES_TTL: timedelta = timedelta(hours=48)
"""Default value of job result, progress, and status queue TTL."""

def __init__(
self,
Expand Down Expand Up @@ -181,7 +181,7 @@ def connect_to_submitted_job(
callback_on_progress_update: Optional[Callable[[Job, JobProgressUpdate], None]],
callback_on_status_update: Optional[Callable[[Job, JobStatusUpdate], None]],
auto_disconnect_on_result: bool,
auto_dead_letter_after_ttl: Optional[timedelta] = JOB_RESULT_MESSAGE_TTL,
auto_cleanup_after_ttl: Optional[timedelta] = JOB_QUEUES_TTL,
reconnect: bool = True,
) -> None:
"""(Re)connect to the running job.
Expand All @@ -196,12 +196,11 @@ def connect_to_submitted_job(
:param auto_disconnect_on_result: Remove/disconnect from all queues pertaining to this job
once the result is received and handled without exceptions through
`callback_on_finished`.
:param auto_dead_letter_after_ttl: When erroneous situations occur (e.g. client is offline),
the job result message (if available) will be dead lettered after the given TTL,
and all queues of this job will be removed subsequently. Default to 48 hours if unset.
Set to `None` to turn off auto dead letter and clean up, but be aware this may lead to
messages and queues to be stored in RabbitMQ indefinitely
(which uses up memory & disk space).
:param auto_cleanup_after_ttl: When erroneous situations occur (e.g. client is offline),
all queues pertaining to this job will be removed after the given TTL.
Default to 48 hours if unset. Set to `None` to turn off auto clean up,
but be aware this may lead to leftover messages and queues to be stored
in RabbitMQ indefinitely (which uses up memory & disk space).
:param reconnect: When True, first check the job queues status and raise an error if not
exist. Default to True.
"""
Expand Down Expand Up @@ -240,32 +239,21 @@ def connect_to_submitted_job(
logger.info("Connecting to update for job %s and expect manual disconnect", job.id)
auto_disconnect_handler = None

# TODO: handle reconnection after the message is dead lettered but queue still exists.

if auto_dead_letter_after_ttl is not None:
message_ttl = auto_dead_letter_after_ttl
queue_ttl = auto_dead_letter_after_ttl * 2
if auto_cleanup_after_ttl is not None:
queue_ttl = auto_cleanup_after_ttl
logger.info(
"Auto dead letter and cleanup on error after TTL is set. "
+ "The leftover job result message will be dead lettered after %s, "
"Auto job queues clean up on error after TTL is set. "
+ "The leftover job messages will be dropped, "
+ "and leftover job queues will be discarded after %s.",
message_ttl,
queue_ttl,
)
job_result_queue_message_ttl = QueueMessageTTLArguments(
queue_ttl=queue_ttl,
message_ttl=message_ttl,
dead_letter_routing_key=OmotesQueueNames.job_result_dead_letter_queue_name(),
dead_letter_exchange=OmotesQueueNames.omotes_exchange_name(),
)
job_progress_status_queue_ttl = QueueMessageTTLArguments(queue_ttl=queue_ttl)
job_queue_ttl = QueueTTLArguments(queue_ttl=queue_ttl)
else:
logger.info(
"Auto dead letter and cleanup on error after TTL is not set. "
"Auto job queues clean up on error after TTL is not set. "
+ "Manual cleanup on leftover job queues and messages might be required."
)
job_result_queue_message_ttl = None
job_progress_status_queue_ttl = None
job_queue_ttl = None

callback_handler = JobSubmissionCallbackHandler(
job,
Expand All @@ -281,23 +269,23 @@ def connect_to_submitted_job(
queue_type=AMQPQueueType.DURABLE,
exchange_name=OmotesQueueNames.omotes_exchange_name(),
delete_after_messages=1,
queue_message_ttl=job_result_queue_message_ttl,
queue_ttl=job_queue_ttl,
)
if callback_on_progress_update:
self.broker_if.declare_queue_and_add_subscription(
queue_name=job_progress_queue_name,
callback_on_message=callback_handler.callback_on_progress_update_wrapped,
queue_type=AMQPQueueType.DURABLE,
exchange_name=OmotesQueueNames.omotes_exchange_name(),
queue_message_ttl=job_progress_status_queue_ttl,
queue_ttl=job_queue_ttl,
)
if callback_on_status_update:
self.broker_if.declare_queue_and_add_subscription(
queue_name=job_status_queue_name,
callback_on_message=callback_handler.callback_on_status_update_wrapped,
queue_type=AMQPQueueType.DURABLE,
exchange_name=OmotesQueueNames.omotes_exchange_name(),
queue_message_ttl=job_progress_status_queue_ttl,
queue_ttl=job_queue_ttl,
)

def submit_job(
Expand All @@ -311,7 +299,7 @@ def submit_job(
callback_on_status_update: Optional[Callable[[Job, JobStatusUpdate], None]],
auto_disconnect_on_result: bool,
job_reference: Optional[str] = None,
auto_dead_letter_after_ttl: Optional[timedelta] = JOB_RESULT_MESSAGE_TTL,
auto_cleanup_after_ttl: Optional[timedelta] = JOB_QUEUES_TTL,
) -> Job:
"""Submit a new job and connect to progress and status updates and the job result.
Expand All @@ -331,12 +319,11 @@ def submit_job(
`callback_on_finished`.
:param job_reference: An optional reference to the submitted job which is used in the
name of the output ESDL as well as in internal logging of OMOTES.
:param auto_dead_letter_after_ttl: When erroneous situations occur (e.g. client is offline),
the job result message (if available) will be dead lettered after the given TTL,
and all queues of this job will be removed subsequently. Default to 48 hours if unset.
Set to `None` to turn off auto dead letter and clean up, but be aware this may lead to
messages and queues to be stored in RabbitMQ indefinitely
(which uses up memory & disk space).
:param auto_cleanup_after_ttl: When erroneous situations occur (e.g. client is offline),
all queues pertaining to this job will be removed after the given TTL.
Default to 48 hours if unset. Set to `None` to turn off auto clean up,
but be aware this may lead to leftover messages and queues to be stored
in RabbitMQ indefinitely (which uses up memory & disk space).
:raises UnknownWorkflowException: If `workflow_type` is unknown as a possible workflow in
this interface.
:return: The job handle which is created. This object needs to be saved persistently by the
Expand All @@ -356,7 +343,7 @@ def submit_job(
callback_on_progress_update,
callback_on_status_update,
auto_disconnect_on_result,
auto_dead_letter_after_ttl,
auto_cleanup_after_ttl,
reconnect,
)

Expand Down
8 changes: 0 additions & 8 deletions src/omotes_sdk/queue_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,3 @@ def request_available_workflows_queue_name() -> str:
:return: The queue name.
"""
return "request_available_workflows"

@staticmethod
def job_result_dead_letter_queue_name() -> str:
"""Generate the job result dead letter queue name.
:return: The queue name.
"""
return "job_result_message_dlq"
Loading

0 comments on commit 6a91e56

Please sign in to comment.