Skip to content

Commit

Permalink
Merge pull request #76 from Project-OMOTES/75-refactor-the-existing-i…
Browse files Browse the repository at this point in the history
…mplementation-of-leftover-job-queues-clean-up-by-removing-the-dead-letter-queue-subscription-from-the-orchestrator

Remove connecting and subscribing to the job result dead letter queue.
  • Loading branch information
cwang39403 authored Oct 28, 2024
2 parents 9ca3eff + e92402f commit 0c9a330
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 78 deletions.
46 changes: 1 addition & 45 deletions src/omotes_orchestrator/main.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import os
import logging
import signal
import sys
import threading
import pprint
import uuid
from datetime import timedelta, datetime
from datetime import timedelta
from types import FrameType
from typing import Any, Union

Expand Down Expand Up @@ -158,9 +157,6 @@ class Orchestrator:
"""Cancel and delete the job when it is timed out."""
_init_barriers: LifeCycleBarrierManager

LOGS_LOCAL_DIR: str = "../logs"
"""The local directory path to keep log files."""

def __init__(
self,
config: OrchestratorConfig,
Expand Down Expand Up @@ -236,10 +232,6 @@ def start(self) -> None:
)
self.omotes_sdk_if.send_available_workflows()

self.omotes_sdk_if.connect_to_job_result_dead_letter_queue(
callback_on_dead_lettered_job_result=self.dead_lettered_job_result_handler
)

self.postgres_job_manager.start()
self.timeout_job_manager.start()

Expand Down Expand Up @@ -422,42 +414,6 @@ def job_cancellation_handler(self, job_cancellation: JobCancel) -> None:
)
self._cleanup_job(job_id)

def dead_lettered_job_result_handler(self, job_result: JobResult) -> None:
"""Handle the received dead lettered job result.
When the log level is set at the DEBUG level or below,
the dead lettered job result will be written to a local file.
:param job_result: Job result message.
"""
logger.info(
"Received a dead lettered job_%s_result with result type as: %s",
job_result.uuid,
job_result.result_type,
)

log_level = logger.getEffectiveLevel()
if log_level <= logging.DEBUG:
try:
log_dir = self.LOGS_LOCAL_DIR
if not os.path.exists(log_dir):
os.makedirs(log_dir)

cur_time = datetime.now().strftime("%Y%m%d-%H%M%S")
file_name = f"{cur_time}_dead_lettered_job_{job_result.uuid}_result.txt"
file_path = os.path.join(log_dir, file_name)
with open(file_path, "a") as f:
f.write("--------------Logs:\n")
f.write(job_result.logs + "\n")
f.write("--------------Status:\n")
f.write(str(job_result.result_type) + "\n")
f.write("--------------ESDL:\n")
f.write(job_result.output_esdl + "\n")

logger.info("The job result is logged to: %s", file_path)
except Exception as e:
logger.warning("An error occurred while logging the job result: %s", e)

def _cleanup_job(self, job_id: uuid.UUID) -> None:
"""Cleanup any references to job with id `job_id`.
Expand Down
33 changes: 0 additions & 33 deletions src/omotes_orchestrator/sdk_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,6 @@ def callback_on_request_workflows_wrapped(self, message: bytes) -> None:
self.callback_on_request_workflows(request_available_workflows)


@dataclass
class DeadLetteredJobResultHandler:
"""Handler to set up callback when receiving a dead lettered job result."""

callback_on_dead_lettered_job_result: Callable[[JobResult], None]
"""Callback to call when a dead lettered job result is received."""

def callback_on_dead_lettered_job_result_wrapped(self, message: bytes) -> None:
"""Prepare the dead lettered `JobResult` message before passing them to the callback.
:param message: Serialized AMQP message containing a dead lettered job result.
"""
dead_lettered_job = JobResult()
dead_lettered_job.ParseFromString(message)

self.callback_on_dead_lettered_job_result(dead_lettered_job)


class SDKInterface:
"""RabbitMQ interface specifically for the orchestrator."""

Expand Down Expand Up @@ -169,21 +151,6 @@ def connect_to_request_available_workflows(
exchange_name=OmotesQueueNames.omotes_exchange_name(),
)

def connect_to_job_result_dead_letter_queue(
self, callback_on_dead_lettered_job_result: Callable[[JobResult], None]
) -> None:
"""Connect to the job result dead letter queue.
:param callback_on_dead_lettered_job_result: Callback to handle a dead lettered job result.
"""
callback_handler = DeadLetteredJobResultHandler(callback_on_dead_lettered_job_result)
self.broker_if.declare_queue_and_add_subscription(
queue_name=OmotesQueueNames.job_result_dead_letter_queue_name(),
callback_on_message=callback_handler.callback_on_dead_lettered_job_result_wrapped,
queue_type=AMQPQueueType.DURABLE,
exchange_name=OmotesQueueNames.omotes_exchange_name(),
)

def send_job_progress_update(self, job: Job, progress_update: JobProgressUpdate) -> None:
"""Send a job progress update to the SDK.
Expand Down

0 comments on commit 0c9a330

Please sign in to comment.