diff --git a/alab_management/experiment_manager.py b/alab_management/experiment_manager.py index 9dcff3d5..2b1a6059 100644 --- a/alab_management/experiment_manager.py +++ b/alab_management/experiment_manager.py @@ -6,6 +6,7 @@ done. """ +import multiprocessing import time from typing import Any @@ -23,7 +24,7 @@ class ExperimentManager: and submit the experiment to executor and flag the completed experiments. """ - def __init__(self): + def __init__(self, live_time: float | None = None, termination_event=None): self.experiment_view = ExperimentView() self.task_view = TaskView() self.sample_view = SampleView() @@ -36,6 +37,9 @@ def __init__(self): if self.__copy_to_completed_db: self.completed_experiment_view = CompletedExperimentView() + self.live_time = live_time + self.termination_event = termination_event or multiprocessing.Event() + def run(self): """Start the event loop.""" self.logger.system_log( @@ -45,7 +49,8 @@ def run(self): "type": "ExperimentManagerStarted", }, ) - while True: + start = time.time() + while not self.termination_event.is_set() and (self.live_time is None or time.time() - start < self.live_time): self._loop() time.sleep(1) diff --git a/alab_management/resource_manager/resource_manager.py b/alab_management/resource_manager/resource_manager.py index 6ec6add1..dca0b891 100644 --- a/alab_management/resource_manager/resource_manager.py +++ b/alab_management/resource_manager/resource_manager.py @@ -3,6 +3,7 @@ which actually executes the tasks. """ +import multiprocessing import time from datetime import datetime from traceback import format_exc @@ -34,7 +35,7 @@ class ResourceManager(RequestMixin): (2) handle all the resource requests """ - def __init__(self): + def __init__(self, live_time: float | None = None, termination_event=None): load_definition() self.task_view = TaskView() self.sample_view = SampleView() @@ -44,10 +45,13 @@ def __init__(self): self.logger = DBLogger(task_id=None) super().__init__() time.sleep(1) # allow some time for other modules to launch + self.live_time = live_time + self.termination_event = termination_event or multiprocessing.Event() def run(self): """Start the loop.""" - while True: + start = time.time() + while not self.termination_event.is_set() and (self.live_time is None or time.time() - start < self.live_time): self._loop() time.sleep(0.5) diff --git a/alab_management/scripts/launch_lab.py b/alab_management/scripts/launch_lab.py index 19af3e20..1e998fda 100644 --- a/alab_management/scripts/launch_lab.py +++ b/alab_management/scripts/launch_lab.py @@ -4,39 +4,34 @@ import multiprocessing import sys import time -from multiprocessing import Process - +from threading import Thread from gevent.pywsgi import WSGIServer # type: ignore - +from multiprocessing import Process with contextlib.suppress(RuntimeError): multiprocessing.set_start_method("spawn") - class RestartableProcess: """A class for creating processes that can be automatically restarted after failures.""" - def __init__(self, target_func, live_time=None): self.target_func = target_func self.live_time = live_time self.process = None + self.termination_event = termination_event or multiprocessing.Event() def run(self): - while True: - self.process = Process(target=self.target_func) - self.process.start() - self.process.join() # Wait for process to finish + start = time.time() + while not self.termination_event.is_set() and (self.live_time is None or time.time() - start < self.live_time): + try: + process = multiprocessing.Process(target=self.target, args=self.args) + process.start() + process.join() # Wait for process to finish # Check exit code and restart if needed if self.process.exitcode == 0: print(f"Process {self.process.name} exited normally. Restarting...") else: - print( - f"Process {self.process.name} exited with code {self.process.exitcode}." - ) - time.sleep( - self.live_time or 0 - ) # Restart after live_time or immediately if None - + print(f"Process {self.process.name} exited with code {self.process.exitcode}.") + time.sleep(self.live_time or 0) # Restart after live_time or immediately if None def launch_dashboard(host: str, port: int, debug: bool = False): """Launch the dashboard alone.""" @@ -59,7 +54,7 @@ def launch_experiment_manager(): from alab_management.utils.module_ops import load_definition load_definition() - experiment_manager = ExperimentManager() + experiment_manager = ExperimentManager(live_time=3600, termination_event=termination_event) experiment_manager.run() @@ -69,17 +64,27 @@ def launch_task_manager(): from alab_management.utils.module_ops import load_definition load_definition() - task_launcher = TaskManager(live_time=3600) + task_launcher = TaskManager(live_time=3600, termination_event=termination_event) task_launcher.run() +def launch_device_manager(): + """Launch the device manager.""" + from alab_management.device_manager import DeviceManager + from alab_management.utils.module_ops import load_definition + + load_definition() + device_manager = DeviceManager() + device_manager.run() + + def launch_resource_manager(): """Launch the resource manager.""" from alab_management.resource_manager.resource_manager import ResourceManager from alab_management.utils.module_ops import load_definition load_definition() - resource_manager = ResourceManager() + resource_manager = ResourceManager(live_time=3600, termination_event=termination_event) resource_manager.run() @@ -96,19 +101,21 @@ def launch_lab(host, port, debug): sys.exit(1) # Create RestartableProcess objects for each process - dashboard_process = RestartableProcess( - target=launch_dashboard, args=(host, port, debug), live_time=3600 - ) # Restart every hour + dashboard_process = RestartableProcess(target=launch_dashboard, args=(host, port, debug), live_time=3600) # Restart every hour experiment_manager_process = RestartableProcess(target=launch_experiment_manager) task_launcher_process = RestartableProcess(target=launch_task_manager) + device_manager_process = RestartableProcess(target=launch_device_manager) resource_manager_process = RestartableProcess(target=launch_resource_manager) # Start the processes dashboard_process.run() experiment_manager_process.run() task_launcher_process.run() + device_manager_process.run() resource_manager_process.run() - """With RestartableProcess, each process is designed to handle restarts automatically. - So, there's no need to worry about the program exiting before background tasks finish - - they will be restarted by RestartableProcess if necessary.""" + return threads + +def terminate_all_processes(): + """Set the termination event to stop all processes.""" + termination_event.set() diff --git a/alab_management/task_manager/task_manager.py b/alab_management/task_manager/task_manager.py index 026c41fb..11d4058c 100644 --- a/alab_management/task_manager/task_manager.py +++ b/alab_management/task_manager/task_manager.py @@ -3,6 +3,7 @@ which actually executes the tasks. """ +import multiprocessing import time from dramatiq_abort import abort, abort_requested @@ -22,18 +23,19 @@ class TaskManager: (2) handle all the resource requests """ - def __init__(self, live_time: float | None = None): + def __init__(self, live_time: float | None = None, termination_event=None): load_definition() self.task_view = TaskView() self.logger = DBLogger(task_id=None) super().__init__() time.sleep(1) # allow some time for other modules to launch self.live_time = live_time + self.termination_event = termination_event or multiprocessing.Event() def run(self): """Start the loop.""" start = time.time() - while (time.time() - start) < self.live_time: + while not self.termination_event.is_set() and (self.live_time is None or time.time() - start < self.live_time): self._loop() time.sleep(1)