From d8cb90a134e71056e9f32630f567d4fa3553b347 Mon Sep 17 00:00:00 2001 From: Olympia Dartsi Date: Tue, 2 Jul 2024 11:27:06 -0700 Subject: [PATCH] step 3: implement process restart for alabos --- alab_management/scripts/launch_lab.py | 70 +++++++++++--------- alab_management/task_manager/task_manager.py | 7 +- 2 files changed, 42 insertions(+), 35 deletions(-) diff --git a/alab_management/scripts/launch_lab.py b/alab_management/scripts/launch_lab.py index d79b6929..a492ab13 100644 --- a/alab_management/scripts/launch_lab.py +++ b/alab_management/scripts/launch_lab.py @@ -5,12 +5,30 @@ import sys import time from threading import Thread - from gevent.pywsgi import WSGIServer # type: ignore - +from multiprocessing import Process with contextlib.suppress(RuntimeError): multiprocessing.set_start_method("spawn") +class RestartableProcess: + """A class for creating processes that can be automatically restarted after failures.""" + def __init__(self, target_func, live_time=None): + self.target_func = target_func + self.live_time = live_time + self.process = None + + def run(self): + while True: + self.process = Process(target=self.target_func) + self.process.start() + self.process.join() # Wait for process to finish + + # Check exit code and restart if needed + if self.process.exitcode == 0: + print(f"Process {self.process.name} exited normally. Restarting...") + else: + print(f"Process {self.process.name} exited with code {self.process.exitcode}.") + time.sleep(self.live_time or 0) # Restart after live_time or immediately if None def launch_dashboard(host: str, port: int, debug: bool = False): """Launch the dashboard alone.""" @@ -43,7 +61,7 @@ def launch_task_manager(): from alab_management.utils.module_ops import load_definition load_definition() - task_launcher = TaskManager() + task_launcher = TaskManager(live_time=3600) task_launcher.run() @@ -79,32 +97,20 @@ def launch_lab(host, port, debug): ) sys.exit(1) - dashboard_thread = Thread(target=launch_dashboard, args=(host, port, debug)) - experiment_manager_thread = Thread(target=launch_experiment_manager) - task_launcher_thread = Thread(target=launch_task_manager) - device_manager_thread = Thread(target=launch_device_manager) - resource_manager_thread = Thread(target=launch_resource_manager) - - dashboard_thread.daemon = experiment_manager_thread.daemon = ( - task_launcher_thread.daemon - ) = device_manager_thread.daemon = resource_manager_thread.daemon = True - - dashboard_thread.start() - device_manager_thread.start() - experiment_manager_thread.start() - task_launcher_thread.start() - resource_manager_thread.start() - - while True: - time.sleep(1.5) - if not experiment_manager_thread.is_alive(): - sys.exit(1001) - - if not task_launcher_thread.is_alive(): - sys.exit(1002) - - if not dashboard_thread.is_alive(): - sys.exit(1003) - - if not device_manager_thread.is_alive(): - sys.exit(1004) + # Create RestartableProcess objects for each process + dashboard_process = RestartableProcess(target=launch_dashboard, args=(host, port, debug), live_time=3600) # Restart every hour + experiment_manager_process = RestartableProcess(target=launch_experiment_manager) + task_launcher_process = RestartableProcess(target=launch_task_manager) + device_manager_process = RestartableProcess(target=launch_device_manager) + resource_manager_process = RestartableProcess(target=launch_resource_manager) + + # Start the processes + dashboard_process.run() + experiment_manager_process.run() + task_launcher_process.run() + device_manager_process.run() + resource_manager_process.run() + + """With RestartableProcess, each process is designed to handle restarts automatically. + So, there's no need to worry about the program exiting before background tasks finish - + they will be restarted by RestartableProcess if necessary.""" diff --git a/alab_management/task_manager/task_manager.py b/alab_management/task_manager/task_manager.py index 107c60d2..026c41fb 100644 --- a/alab_management/task_manager/task_manager.py +++ b/alab_management/task_manager/task_manager.py @@ -22,17 +22,18 @@ class TaskManager: (2) handle all the resource requests """ - def __init__(self): + def __init__(self, live_time: float | None = None): load_definition() self.task_view = TaskView() - self.logger = DBLogger(task_id=None) super().__init__() time.sleep(1) # allow some time for other modules to launch + self.live_time = live_time def run(self): """Start the loop.""" - while True: + start = time.time() + while (time.time() - start) < self.live_time: self._loop() time.sleep(1)