Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/75-alabos-hot-restart' into 75-a…
Browse files Browse the repository at this point in the history
…labos-hot-restart

# Conflicts:
#	alab_management/device_manager.py
#	alab_management/scripts/launch_lab.py
  • Loading branch information
idocx committed Jul 3, 2024
2 parents 1f2fb97 + bf53041 commit fdb59e3
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 31 deletions.
9 changes: 7 additions & 2 deletions alab_management/experiment_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
done.
"""

import multiprocessing
import time
from typing import Any

Expand All @@ -23,7 +24,7 @@ class ExperimentManager:
and submit the experiment to executor and flag the completed experiments.
"""

def __init__(self):
def __init__(self, live_time: float | None = None, termination_event=None):
self.experiment_view = ExperimentView()
self.task_view = TaskView()
self.sample_view = SampleView()
Expand All @@ -36,6 +37,9 @@ def __init__(self):
if self.__copy_to_completed_db:
self.completed_experiment_view = CompletedExperimentView()

self.live_time = live_time
self.termination_event = termination_event or multiprocessing.Event()

def run(self):
"""Start the event loop."""
self.logger.system_log(
Expand All @@ -45,7 +49,8 @@ def run(self):
"type": "ExperimentManagerStarted",
},
)
while True:
start = time.time()
while not self.termination_event.is_set() and (self.live_time is None or time.time() - start < self.live_time):
self._loop()
time.sleep(1)

Expand Down
8 changes: 6 additions & 2 deletions alab_management/resource_manager/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
which actually executes the tasks.
"""

import multiprocessing
import time
from datetime import datetime
from traceback import format_exc
Expand Down Expand Up @@ -34,7 +35,7 @@ class ResourceManager(RequestMixin):
(2) handle all the resource requests
"""

def __init__(self):
def __init__(self, live_time: float | None = None, termination_event=None):
load_definition()
self.task_view = TaskView()
self.sample_view = SampleView()
Expand All @@ -44,10 +45,13 @@ def __init__(self):
self.logger = DBLogger(task_id=None)
super().__init__()
time.sleep(1) # allow some time for other modules to launch
self.live_time = live_time
self.termination_event = termination_event or multiprocessing.Event()

def run(self):
"""Start the loop."""
while True:
start = time.time()
while not self.termination_event.is_set() and (self.live_time is None or time.time() - start < self.live_time):
self._loop()
time.sleep(0.5)

Expand Down
57 changes: 32 additions & 25 deletions alab_management/scripts/launch_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,34 @@
import multiprocessing
import sys
import time
from multiprocessing import Process

from threading import Thread
from gevent.pywsgi import WSGIServer # type: ignore

from multiprocessing import Process
with contextlib.suppress(RuntimeError):
multiprocessing.set_start_method("spawn")


class RestartableProcess:
"""A class for creating processes that can be automatically restarted after failures."""

def __init__(self, target_func, live_time=None):
self.target_func = target_func
self.live_time = live_time
self.process = None
self.termination_event = termination_event or multiprocessing.Event()

def run(self):
while True:
self.process = Process(target=self.target_func)
self.process.start()
self.process.join() # Wait for process to finish
start = time.time()
while not self.termination_event.is_set() and (self.live_time is None or time.time() - start < self.live_time):
try:
process = multiprocessing.Process(target=self.target, args=self.args)
process.start()
process.join() # Wait for process to finish

# Check exit code and restart if needed
if self.process.exitcode == 0:
print(f"Process {self.process.name} exited normally. Restarting...")
else:
print(
f"Process {self.process.name} exited with code {self.process.exitcode}."
)
time.sleep(
self.live_time or 0
) # Restart after live_time or immediately if None

print(f"Process {self.process.name} exited with code {self.process.exitcode}.")
time.sleep(self.live_time or 0) # Restart after live_time or immediately if None

def launch_dashboard(host: str, port: int, debug: bool = False):
"""Launch the dashboard alone."""
Expand All @@ -59,7 +54,7 @@ def launch_experiment_manager():
from alab_management.utils.module_ops import load_definition

load_definition()
experiment_manager = ExperimentManager()
experiment_manager = ExperimentManager(live_time=3600, termination_event=termination_event)
experiment_manager.run()


Expand All @@ -69,17 +64,27 @@ def launch_task_manager():
from alab_management.utils.module_ops import load_definition

load_definition()
task_launcher = TaskManager(live_time=3600)
task_launcher = TaskManager(live_time=3600, termination_event=termination_event)
task_launcher.run()


def launch_device_manager():
"""Launch the device manager."""
from alab_management.device_manager import DeviceManager
from alab_management.utils.module_ops import load_definition

load_definition()
device_manager = DeviceManager()
device_manager.run()


def launch_resource_manager():
"""Launch the resource manager."""
from alab_management.resource_manager.resource_manager import ResourceManager
from alab_management.utils.module_ops import load_definition

load_definition()
resource_manager = ResourceManager()
resource_manager = ResourceManager(live_time=3600, termination_event=termination_event)
resource_manager.run()


Expand All @@ -96,19 +101,21 @@ def launch_lab(host, port, debug):
sys.exit(1)

# Create RestartableProcess objects for each process
dashboard_process = RestartableProcess(
target=launch_dashboard, args=(host, port, debug), live_time=3600
) # Restart every hour
dashboard_process = RestartableProcess(target=launch_dashboard, args=(host, port, debug), live_time=3600) # Restart every hour
experiment_manager_process = RestartableProcess(target=launch_experiment_manager)
task_launcher_process = RestartableProcess(target=launch_task_manager)
device_manager_process = RestartableProcess(target=launch_device_manager)
resource_manager_process = RestartableProcess(target=launch_resource_manager)

# Start the processes
dashboard_process.run()
experiment_manager_process.run()
task_launcher_process.run()
device_manager_process.run()
resource_manager_process.run()

"""With RestartableProcess, each process is designed to handle restarts automatically.
So, there's no need to worry about the program exiting before background tasks finish -
they will be restarted by RestartableProcess if necessary."""
return threads

def terminate_all_processes():
"""Set the termination event to stop all processes."""
termination_event.set()
6 changes: 4 additions & 2 deletions alab_management/task_manager/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
which actually executes the tasks.
"""

import multiprocessing
import time

from dramatiq_abort import abort, abort_requested
Expand All @@ -22,18 +23,19 @@ class TaskManager:
(2) handle all the resource requests
"""

def __init__(self, live_time: float | None = None):
def __init__(self, live_time: float | None = None, termination_event=None):
load_definition()
self.task_view = TaskView()
self.logger = DBLogger(task_id=None)
super().__init__()
time.sleep(1) # allow some time for other modules to launch
self.live_time = live_time
self.termination_event = termination_event or multiprocessing.Event()

def run(self):
"""Start the loop."""
start = time.time()
while (time.time() - start) < self.live_time:
while not self.termination_event.is_set() and (self.live_time is None or time.time() - start < self.live_time):
self._loop()
time.sleep(1)

Expand Down

0 comments on commit fdb59e3

Please sign in to comment.