diff --git a/src/omotes_orchestrator/main.py b/src/omotes_orchestrator/main.py index 59c9996..5b780d0 100644 --- a/src/omotes_orchestrator/main.py +++ b/src/omotes_orchestrator/main.py @@ -1,11 +1,10 @@ -import os import logging import signal import sys import threading import pprint import uuid -from datetime import timedelta, datetime +from datetime import timedelta from types import FrameType from typing import Any, Union @@ -158,9 +157,6 @@ class Orchestrator: """Cancel and delete the job when it is timed out.""" _init_barriers: LifeCycleBarrierManager - LOGS_LOCAL_DIR: str = "../logs" - """The local directory path to keep log files.""" - def __init__( self, config: OrchestratorConfig, @@ -236,10 +232,6 @@ def start(self) -> None: ) self.omotes_sdk_if.send_available_workflows() - self.omotes_sdk_if.connect_to_job_result_dead_letter_queue( - callback_on_dead_lettered_job_result=self.dead_lettered_job_result_handler - ) - self.postgres_job_manager.start() self.timeout_job_manager.start() @@ -422,42 +414,6 @@ def job_cancellation_handler(self, job_cancellation: JobCancel) -> None: ) self._cleanup_job(job_id) - def dead_lettered_job_result_handler(self, job_result: JobResult) -> None: - """Handle the received dead lettered job result. - - When the log level is set at the DEBUG level or below, - the dead lettered job result will be written to a local file. - - :param job_result: Job result message. - """ - logger.info( - "Received a dead lettered job_%s_result with result type as: %s", - job_result.uuid, - job_result.result_type, - ) - - log_level = logger.getEffectiveLevel() - if log_level <= logging.DEBUG: - try: - log_dir = self.LOGS_LOCAL_DIR - if not os.path.exists(log_dir): - os.makedirs(log_dir) - - cur_time = datetime.now().strftime("%Y%m%d-%H%M%S") - file_name = f"{cur_time}_dead_lettered_job_{job_result.uuid}_result.txt" - file_path = os.path.join(log_dir, file_name) - with open(file_path, "a") as f: - f.write("--------------Logs:\n") - f.write(job_result.logs + "\n") - f.write("--------------Status:\n") - f.write(str(job_result.result_type) + "\n") - f.write("--------------ESDL:\n") - f.write(job_result.output_esdl + "\n") - - logger.info("The job result is logged to: %s", file_path) - except Exception as e: - logger.warning("An error occurred while logging the job result: %s", e) - def _cleanup_job(self, job_id: uuid.UUID) -> None: """Cleanup any references to job with id `job_id`. diff --git a/src/omotes_orchestrator/sdk_interface.py b/src/omotes_orchestrator/sdk_interface.py index de94f6b..a506c70 100644 --- a/src/omotes_orchestrator/sdk_interface.py +++ b/src/omotes_orchestrator/sdk_interface.py @@ -78,24 +78,6 @@ def callback_on_request_workflows_wrapped(self, message: bytes) -> None: self.callback_on_request_workflows(request_available_workflows) -@dataclass -class DeadLetteredJobResultHandler: - """Handler to set up callback when receiving a dead lettered job result.""" - - callback_on_dead_lettered_job_result: Callable[[JobResult], None] - """Callback to call when a dead lettered job result is received.""" - - def callback_on_dead_lettered_job_result_wrapped(self, message: bytes) -> None: - """Prepare the dead lettered `JobResult` message before passing them to the callback. - - :param message: Serialized AMQP message containing a dead lettered job result. - """ - dead_lettered_job = JobResult() - dead_lettered_job.ParseFromString(message) - - self.callback_on_dead_lettered_job_result(dead_lettered_job) - - class SDKInterface: """RabbitMQ interface specifically for the orchestrator.""" @@ -169,21 +151,6 @@ def connect_to_request_available_workflows( exchange_name=OmotesQueueNames.omotes_exchange_name(), ) - def connect_to_job_result_dead_letter_queue( - self, callback_on_dead_lettered_job_result: Callable[[JobResult], None] - ) -> None: - """Connect to the job result dead letter queue. - - :param callback_on_dead_lettered_job_result: Callback to handle a dead lettered job result. - """ - callback_handler = DeadLetteredJobResultHandler(callback_on_dead_lettered_job_result) - self.broker_if.declare_queue_and_add_subscription( - queue_name=OmotesQueueNames.job_result_dead_letter_queue_name(), - callback_on_message=callback_handler.callback_on_dead_lettered_job_result_wrapped, - queue_type=AMQPQueueType.DURABLE, - exchange_name=OmotesQueueNames.omotes_exchange_name(), - ) - def send_job_progress_update(self, job: Job, progress_update: JobProgressUpdate) -> None: """Send a job progress update to the SDK.