Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up left-over job status, progress & result queues #80

Open
lfse-slafleur opened this issue Jul 12, 2024 · 10 comments
Open

Clean up left-over job status, progress & result queues #80

lfse-slafleur opened this issue Jul 12, 2024 · 10 comments
Assignees
Labels
enhancement New feature or request

Comments

@lfse-slafleur
Copy link
Member

lfse-slafleur commented Jul 12, 2024

In certain faulty situations it may happen that a queue for job status, progress & results may not be removed and it will remain indefinitely. In order to properly clean up the resources, we need to:

  • In case of a result queue, check if it is empty.
  • Log that the queues were found and removed.
  • A queue is considered stale if it is unused AND older than 48 hours.

If this happens, it happens probably because the relevant SDK is offline and did not reconnect to the relevant queues and it will not reconnect. Therefore, we cannot rely on the SDK normal operation for the clean up. However, we need to make sure we NEVER delete a queue that contains messages and may be read at some point in the future. 3 components appear candidates:

  1. RabbitMQ broker using queue ttl
  1. RabbitMQ broker using per-message TTL
  1. Orchestrator
  1. SDK
  • Start a thread on the SDK side to allow an integrating application to choose what to do with the old queue. (However, then we will have to implement it in all SDKs)

We also need to ensure that the SDK reconnect can handle if a queue is not available (throw a proper exception instead of just crashing) e.g. use the callback_on_no_message over here: https://github.com/Project-OMOTES/omotes-sdk-python/blob/main/src/omotes_sdk/omotes_interface.py#L176C13-L176C35

@lfse-slafleur
Copy link
Member Author

Another resource: look for auto-delete https://www.rabbitmq.com/docs/queues

@lfse-slafleur lfse-slafleur added the enhancement New feature or request label Jul 22, 2024
@cwang39403
Copy link

Hey @lfse-slafleur ,

I made two preliminary commits in SDK and Orchestrator repo. as attempts to address this issue.

The automatic job queues cleaning-up approach is based on utilizing queue and message TTL arguments when declaring the queues. If we specify message_ttl < queue_ttl, additionally with dead_letter_exchange and dead_letter_routing_key, the job result will be first dead-lettered and republished to the dead letter queue before the queue TTL is reached and deleted.

Considering the requirements listed above:

  • In case of a result queue, check if it is empty -> We can be sure the result queue is empty before being deleted.
  • A queue is considered stale if it is unused AND older than 48 hours -> Defined by JOB_QUEUES_TTL and during job queues declaration.
  • Log that the queues were found and removed
    -> This needs some discussion. I was hoping to log a warning message in orchestrator when a queue is removed due to reaching its TTL, but it turns out not so straightforward. Do you have any suggestions?

Additionally, with the current setup, the job results are dead-lettered and retained permanently in the dead letter queue until any further intervention. Do we want to do anything with these dead-lettered messages? (e.g. also define TTL on these messages? expose and consume these messages by the clients in some way?)

@lfse-slafleur
Copy link
Member Author

Hea @cwang39403 !

I like the approach. From an architectural point-of-view, I do believe that the timeout on how long a queue will survive needs to be set by the SDK which is what you do over here: Project-OMOTES/omotes-sdk-python@678dfd0#diff-88ccb7d1df8ecf0ace917d21e9a7ad6561ac792085c16a56f377d927181714a3R108 . This is great! Perhaps we can make this configurable just so an SDK can turn off this feature if necessary (by setting the timeouts to None/infinite or something).

With your approach, the (result) messages are already deadlettered which is great! This allows us to log clean ups based on messages instead of queues so we should be able to fulfill the requirement if we change it a little: Log that the deadlettered messages which were found and removed This is something the orchestrator could do so perhaps we make the orchestrator a subscriber to the deadletter queue? Regarding names, perhaps we can keep it to something like job-result-message-dlq (dlq is the acronym for dead-letter queue) as sometime in the future we may have multiple dead letter queues.

Could you also confirm if a queue may be persistent AND have a queue-TTL and contains messages with message-TTL? What happens when RabbitMQ is rebooted and comes online before the TTL should take into effect? What happens when RabbitMQ is rebooted and comes online after the TTL should take into effect?

Curous to your thoughts!

@lfse-slafleur
Copy link
Member Author

Curious also to your findings on what a dead-lettered message looks like. Perhaps it will show us the queue name it was originally from as well. (see comment on the PR)

@cwang39403
Copy link

cwang39403 commented Aug 15, 2024

Hey @lfse-slafleur , thanks for a couple of nice suggestions and questions! My first reaction to these comments:

  • Perhaps we can make this configurable just so an SDK can turn off this feature if necessary (by setting the timeouts to None/infinite or something).

    I made a new commit to allow this feature to be turned off. Project-OMOTES/omotes-sdk-python@c7b2006

    I did not allow SDK users to specify job queue or message TTL timedelta as I think it is probably not needed, and we don't want it to be erroneously set to be a small value.

  • Log that the deadlettered messages which were found and removed This is something the orchestrator could do so perhaps we make the orchestrator a subscriber to the deadletter queue

    Indeed, I was thinking about that. I made a new commit in the orchestrator to address this. Project-OMOTES/orchestrator@d0f00ef

    However, once the message is consumed by the orchestrator and logged by the callback, the message is deleted from the DLQ, which is an unwanted behavior (?). Still need to find out a way to persist the message in DLQ with this implementation.

    2024-08-15 15:33:45,993 [omotes_orchestrator][asyncio_0][main.py:256][INFO]: Received a dead lettered job result: a5a5f528-bd98-42ac-a85c-9a95714cd4a0 with result type as: 0.

  • Regarding names, perhaps we can keep it to something like job-result-message-dlq

    Good one, I renamed it and included it in the new commit.

  • Could you also confirm if a queue may be persistent AND have a queue-TTL and contains messages with message-TTL

    The job queues with queue TTL, and message TTL are declared like this. (Note, the screenshot is made before the latest commit)
    image

  • What happens when RabbitMQ is rebooted and comes online before the TTL should take into effect? What happens when RabbitMQ is rebooted and comes online after the TTL should take into effect?

    I did two manual tests with the following steps.

    Rebooted and came online before the TTL

  1. Submit a job (via optimizer.py) with JOB_QUEUES_TTL set as 60 mins and JOB_RESULT_MESSAGE_TTL as 50 mins
    image
  2. Offline SDK
  3. Orchestrator returns job result message to the queue
  4. Offline OMOTES
  5. Wait ~30 mins
  6. Online OMOTES; Job queues and messages persist as expected
    image
  7. Job result message is dead-lettered after 50 mins (reference to the job submit time)
    image
  8. Job queues are deleted after another 60 mins (reference to the OMOTES online time) --> It shows that the queue TTL is reset when the RabbitMQ reboots.

Rebooted and came online after the TTL

  1. Submit a job (via optimizer.py) with JOB_QUEUES_TTL set as 60 mins and JOB_RESULT_MESSAGE_TTL as 50 mins
  2. Offline SDK
  3. Orchestrator returns job result message to the queue
  4. Offline OMOTES
  5. Wait 60+ mins
  6. Online OMOTES
  7. Job result message is already deleted, but did not dead-lettered to the DLQ!
  8. Job queues are deleted after another 60 mins (reference to OMOTES online time) --> The queue TTL is reset after the RabbitMQ reboots.
  • Curious also to your findings on what a dead-lettered message looks like.

    Indeed as you suspect, the header contains information about the original queue and the reason of the dead letter :)
    image

@cwang39403
Copy link

I just updated the comment above with the finding of What happens when RabbitMQ is rebooted and comes online before the TTL should take into effect? What happens when RabbitMQ is rebooted and comes online after the TTL should take into effect?

@cwang39403
Copy link

Hey @lfse-slafleur,

A follow-up on the found issue mentioned above: Job result message is deleted but not dead-lettered to the DLQ after RabbitMQ is rebooted after the TTL

I repeated the similar test steps above just to be sure. This time with JOB_QUEUES_TTL and JOB_RESULT_MESSAGE_TTL set as 2 and 1 mins respectively. The observation result stays the same, the job result message is deleted but not found in the DLQ, and the queue is deleted after another 2 minutes.

After some Googling, this might be explained with the following:

  • The message is actually republished to the DLQ but eventually gets lost because the DLQ is not ready to receive the message when the RabbitMQ is rebooting. See reference below

    "Prior to RabbitMQ 3.10 dead lettering has not been safe. Messages that get dead lettered from a queue (the "source queue") are not guaranteed to be delivered to the queues routed to by the exchange configured in the dead-letter-exchange policy"
    https://www.rabbitmq.com/blog/2022/03/29/at-least-once-dead-lettering

    image
    https://www.rabbitmq.com/docs/dlx#safety

    -> Since we are using Classic Queues, there is no at-least-once dead-lettering feature available but only available via Quorum Queues https://www.rabbitmq.com/docs/classic-queues#features

  • Race condition? The message is immediately deleted before being able to be republished to the DLQ. (not sure if there is such a race condition by design)

  • Queue is deleted after another 2 minutes: as discussed, this is probably because the message deletion activates the queue (unused to used state) and thus resets the TTL timer.

@lfse-slafleur
Copy link
Member Author

lfse-slafleur commented Sep 11, 2024

Orchestrator will be updated with #101

Should be used together with SDK >= 3.1.0

Missing: system test(s) & documentation

@lfse-slafleur
Copy link
Member Author

lfse-slafleur commented Sep 11, 2024

This ticket will be followed up by a new change. In the current design there is a message TTL & a queue TTL. However, in some cases it is possible that the message TTL is reached (and the message expired) but the queue TTL is not reached. For example, if a result in a job_result queue has a message TTL of 2 hours but the queue job_result has a queue TTL of 4 hours, than if the SDK reconnects after 3 hours then the message will have been expired but the queue will stay indefinitely. The SDK will wait indefinitely for a message.

@cwang39403 has been working on this feature and through his experience we have found that the combination of message TTL and queue TTL is too complex. Expiration needs to be an atomic step or else the SDK might have a different conclusion than the broker and/or orchestrator.

Therefor, we propose to drop the message TTL requirement and not use deadletter queues. Instead, the queue will be dropped upon reaching the queue TTL including any messages it contains.

This still fits with most requirements:

  • Dropping the messages and queue is atomic
  • The result queue may or may not be empty. This is according to design so we do not have to fulfill the requirement of checking if a result is contained in the queue prior to dropping it.
  • Queue may be deleted after 48 hours.

However, we will drop the requirement that we will log a message/queue/job being considered stale/expired when it reaches the expiration criterium. Instead, we will leverage the SDK to figure out something went wrong.

This is allowed because:

  • The orchestrator can proceed and does not block on any job progress. Eventually a job is completed and removed from OMOTES. If the result, progress etc. queues are deleted in the mean time, the message is simply silently dropped by RabbitMQ.
  • The orchestrator does not require feedback from this process in any way. For example, removal of all resources pertaining to a job is removed when it is cancelled/timed out/completed.
  • The SDK does require feedback. It either expects a message in the result queue eventually OR it requires feedback that the result will never arrive. Specifically this latter point can be completed by having the SDK check if a queue exists and reconnecting to it instead of redeclaring a queue upon reconnection. This is possible with a passive queue declare (https://www.rabbitmq.com/amqp-0-9-1-reference#queue.declare.passive) upon reconnecting to an already submitted job. The SDK has the concern to keep track of all jobs that have been submitted but not yet completed so it already knows if it is submitting a completely new job or reconnecting to an already-submitted job.

TODO:

  • We need to check if a passive queue declare when the queue does not exist results in a channel closed. If so, we need to change the underlying code of broker_interface.py to use a new channel on each queue subscription instead of putting all traffic instead a single AMQP channel per connection.

Work will be continued in this ticket: Project-OMOTES/omotes-sdk-python#67

This change solves the following issues:

  • An SDK connects when a message is expired but before the queue is expired. The SDK will wait indefinitely for a response.
  • No use of deadletter queues so messages aren’t dropped when the message TTL expiration is passed while RabbitMQ is offline and later restarted. (Deadlettering in RabbitMQ has no delivery guarantees for classic queues)
  • Message TTL is later than the queue TTL because a job took long so the message is expired only AFTER the queue is already dropped/expired.

See Project-OMOTES/omotes-sdk-python#63 for more indepth discussions on these issues.

@cwang39403
Copy link

@lfse-slafleur About the TODO

We need to check if a passive queue declare when the queue does not exist results in a channel closed. If so, we need to change the underlying code of broker_interface.py to use a new channel on each queue subscription instead of putting all traffic instead a single AMQP channel per connection.

At the moment, there is a reconnect argument in connect_to_submitted_job in https://github.com/Project-OMOTES/omotes-sdk-python/pull/63/files, and use self.broker_if.queue_exists (namly https://aio-pika.readthedocs.io/en/latest/apidoc.html#aio_pika.Channel.get_queue) to check if the queue exists. Does that suffice?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants