Skip to content

Commit

Permalink
step 3: implement process restart for alabos
Browse files Browse the repository at this point in the history
  • Loading branch information
odartsi committed Jul 2, 2024
1 parent 1cadb71 commit d8cb90a
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 35 deletions.
70 changes: 38 additions & 32 deletions alab_management/scripts/launch_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,30 @@
import sys
import time
from threading import Thread

from gevent.pywsgi import WSGIServer # type: ignore

from multiprocessing import Process
with contextlib.suppress(RuntimeError):
multiprocessing.set_start_method("spawn")

class RestartableProcess:
"""A class for creating processes that can be automatically restarted after failures."""
def __init__(self, target_func, live_time=None):
self.target_func = target_func
self.live_time = live_time
self.process = None

def run(self):
while True:
self.process = Process(target=self.target_func)
self.process.start()
self.process.join() # Wait for process to finish

# Check exit code and restart if needed
if self.process.exitcode == 0:
print(f"Process {self.process.name} exited normally. Restarting...")
else:
print(f"Process {self.process.name} exited with code {self.process.exitcode}.")
time.sleep(self.live_time or 0) # Restart after live_time or immediately if None

def launch_dashboard(host: str, port: int, debug: bool = False):
"""Launch the dashboard alone."""
Expand Down Expand Up @@ -43,7 +61,7 @@ def launch_task_manager():
from alab_management.utils.module_ops import load_definition

load_definition()
task_launcher = TaskManager()
task_launcher = TaskManager(live_time=3600)
task_launcher.run()


Expand Down Expand Up @@ -79,32 +97,20 @@ def launch_lab(host, port, debug):
)
sys.exit(1)

dashboard_thread = Thread(target=launch_dashboard, args=(host, port, debug))
experiment_manager_thread = Thread(target=launch_experiment_manager)
task_launcher_thread = Thread(target=launch_task_manager)
device_manager_thread = Thread(target=launch_device_manager)
resource_manager_thread = Thread(target=launch_resource_manager)

dashboard_thread.daemon = experiment_manager_thread.daemon = (
task_launcher_thread.daemon
) = device_manager_thread.daemon = resource_manager_thread.daemon = True

dashboard_thread.start()
device_manager_thread.start()
experiment_manager_thread.start()
task_launcher_thread.start()
resource_manager_thread.start()

while True:
time.sleep(1.5)
if not experiment_manager_thread.is_alive():
sys.exit(1001)

if not task_launcher_thread.is_alive():
sys.exit(1002)

if not dashboard_thread.is_alive():
sys.exit(1003)

if not device_manager_thread.is_alive():
sys.exit(1004)
# Create RestartableProcess objects for each process
dashboard_process = RestartableProcess(target=launch_dashboard, args=(host, port, debug), live_time=3600) # Restart every hour
experiment_manager_process = RestartableProcess(target=launch_experiment_manager)
task_launcher_process = RestartableProcess(target=launch_task_manager)
device_manager_process = RestartableProcess(target=launch_device_manager)
resource_manager_process = RestartableProcess(target=launch_resource_manager)

# Start the processes
dashboard_process.run()
experiment_manager_process.run()
task_launcher_process.run()
device_manager_process.run()
resource_manager_process.run()

"""With RestartableProcess, each process is designed to handle restarts automatically.
So, there's no need to worry about the program exiting before background tasks finish -
they will be restarted by RestartableProcess if necessary."""
7 changes: 4 additions & 3 deletions alab_management/task_manager/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ class TaskManager:
(2) handle all the resource requests
"""

def __init__(self):
def __init__(self, live_time: float | None = None):
load_definition()
self.task_view = TaskView()

self.logger = DBLogger(task_id=None)
super().__init__()
time.sleep(1) # allow some time for other modules to launch
self.live_time = live_time

def run(self):
"""Start the loop."""
while True:
start = time.time()
while (time.time() - start) < self.live_time:
self._loop()
time.sleep(1)

Expand Down

0 comments on commit d8cb90a

Please sign in to comment.