Skip to content

Commit

Permalink
Fixed launch_lab
Browse files Browse the repository at this point in the history
removed multiple multiprocessing, separated process to two: 3 internally timed processes and 1 dashboard
  • Loading branch information
bernardusrendy committed Jul 8, 2024
1 parent d50ba69 commit 9b99b4d
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 61 deletions.
20 changes: 15 additions & 5 deletions alab_management/scripts/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,13 @@ def setup_lab_cli():
)
@click.option("-p", "--port", default="8895", type=int)
@click.option("--debug", default=False, is_flag=True)
def launch_lab_cli(host, port, debug):
@click.option("--live_time", default=10, type=float)
def launch_lab_cli(host, port, debug, live_time):
"""Start to run the lab."""
click.echo(f"The dashboard will be served on http://{host}:{port}")
launch_lab(host, port, debug)
click.echo(
f"The dashboard will be served on http://{host}:{port} with live time {live_time} seconds."
)
launch_lab(host, port, debug, live_time)


@cli.command(
Expand All @@ -80,12 +83,19 @@ def launch_worker_cli(ctx):
@click.option("-a", "--all-collections", is_flag=True, default=False)
@click.option("-f", "--_force_i_know_its_dangerous", is_flag=True, default=False)
@click.option("--database_name", default="Alab_sim")
@click.option("--remove_versions", is_flag=True, default=False)
def cleanup_lab_cli(
all_collections: bool, _force_i_know_its_dangerous: bool, database_name: str
all_collections: bool,
_force_i_know_its_dangerous: bool,
database_name: str,
remove_versions: bool,
):
"""Clean up the database."""
if cleanup_lab(
all_collections, _force_i_know_its_dangerous, database_name=database_name
all_collections,
_force_i_know_its_dangerous,
database_name=database_name,
remove_versions=remove_versions,
):
click.echo("Done")
else:
Expand Down
142 changes: 86 additions & 56 deletions alab_management/scripts/launch_lab.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
"""The script to launch task_view and executor, which are the core of the system."""

import contextlib
import importlib
import multiprocessing
import sys
import time
from threading import Thread

from gevent.pywsgi import WSGIServer # type: ignore

import alab_management.dashboard
from alab_management.utils.versioning import get_version

with contextlib.suppress(RuntimeError):
multiprocessing.set_start_method("spawn")

Expand All @@ -18,44 +21,56 @@
class RestartableProcess:
"""A class for creating processes that can be automatically restarted after failures."""

def __init__(self, target, args=(), live_time=None, termination_event=None):
def __init__(self, target, args=(), termination_event=None):
self.target = target
self.live_time = live_time
self.process = None
self.args = args
self.termination_event = termination_event or multiprocessing.Event()

def run(self):
start = time.time()
while not self.termination_event.is_set() and (
self.live_time is None or time.time() - start < self.live_time
):
try:
process = multiprocessing.Process(target=self.target, args=self.args)
process.start()
process.join() # Wait for process to finish

# Check exit code, handle errors, and restart if needed
if process.exitcode == 0:
print(f"Process {process.name} exited normally. Restarting...")
"""Start the process."""
try:
self.process = multiprocessing.Process(target=self.target, args=self.args)
self.process.daemon = True
self.process.start()
except Exception as e:
print(f"Error occurred while running process: {e}")

def join(self):
"""Join the process."""
try:
if self.process:
self.process.join()
except Exception as e:
print(f"Error occurred while joining process: {e}")

def check_alive(self):
"""Check if the process is alive."""
# Check exit code, handle errors, and restart if needed
is_alive = self.process.is_alive() if self.process is not None else False
if not is_alive:
if self.process is not None:
if self.process.exitcode == 0:
print(f"Process {self.process.name} exited normally. Restarting...")
else:
print(
f"Process {process.name} exited with code {process.exitcode}."
f"Process {self.process.name} exited with code {self.process.exitcode}."
)
except Exception as e:
print(f"Error occurred while running process: {e}")

# Check for termination before restarting
if self.termination_event.is_set():
break
return False, self.process.exitcode
else:
return False, None
return True, None

time.sleep(
self.live_time or 0
) # Restart after live_time or immediately if None
def stop(self):
"""Stop the process."""
if self.process:
self.process.terminate()
self.process.join()


def launch_dashboard(host: str, port: int, debug: bool = False):
def launch_dashboard(host: str, port: int, debug: bool = False, live_time: float = 10):
"""Launch the dashboard alone."""
importlib.reload(alab_management.dashboard)
from alab_management.dashboard import create_app

if debug:
Expand All @@ -69,41 +84,43 @@ def launch_dashboard(host: str, port: int, debug: bool = False):
server.serve_forever()


def launch_experiment_manager():
def launch_experiment_manager(live_time: float):
"""Launch the experiment manager."""
from alab_management.experiment_manager import ExperimentManager
from alab_management.utils.module_ops import load_definition

load_definition()
load_definition(get_version())
experiment_manager = ExperimentManager(
live_time=3600, termination_event=termination_event
live_time=live_time, termination_event=termination_event
)
experiment_manager.run()


def launch_task_manager():
def launch_task_manager(live_time: float):
"""Launch the task manager."""
from alab_management.task_manager.task_manager import TaskManager
from alab_management.utils.module_ops import load_definition

load_definition()
task_launcher = TaskManager(live_time=3600, termination_event=termination_event)
load_definition(get_version())
task_launcher = TaskManager(
live_time=live_time, termination_event=termination_event
)
task_launcher.run()


def launch_resource_manager():
def launch_resource_manager(live_time: float):
"""Launch the resource manager."""
from alab_management.resource_manager.resource_manager import ResourceManager
from alab_management.utils.module_ops import load_definition

load_definition()
load_definition(get_version())
resource_manager = ResourceManager(
live_time=3600, termination_event=termination_event
live_time=live_time, termination_event=termination_event
)
resource_manager.run()


def launch_lab(host, port, debug):
def launch_lab(host, port, debug, live_time: float):
"""Start to run the lab."""
from alab_management.device_view import DeviceView

Expand All @@ -118,42 +135,55 @@ def launch_lab(host, port, debug):
# Create RestartableProcess objects for each process with shared termination_event
dashboard_process = RestartableProcess(
target=launch_dashboard,
args=(host, port, debug),
live_time=3600,
args=(host, port, debug, live_time),
termination_event=termination_event,
)
experiment_manager_process = RestartableProcess(
target=launch_experiment_manager,
args=(host, port, debug),
live_time=3600,
args=([live_time]),
termination_event=termination_event,
)
task_launcher_process = RestartableProcess(
target=launch_task_manager,
args=(host, port, debug),
live_time=3600,
args=([live_time]),
termination_event=termination_event,
)
resource_manager_process = RestartableProcess(
target=launch_resource_manager,
args=(host, port, debug),
live_time=3600,
args=([live_time]),
termination_event=termination_event,
)


# Start the processes using multiprocessing
processes = [dashboard_process, experiment_manager_process, task_launcher_process, resource_manager_process]


jobs = []
# Start the internally timed processes using multiprocessing
processes = [
experiment_manager_process,
task_launcher_process,
resource_manager_process,
]
# Start the processes using multiprocessing and run them until termination_event is set
while not termination_event.is_set():
any_alive = False
for process in processes:
print(not process.check_alive()[0])
if process.check_alive()[0]:
any_alive = True
break
if not any_alive:
if dashboard_process.check_alive()[0]:
dashboard_process.stop()
for process in processes:
process.run()
dashboard_process.run()
else:
if process.check_alive()[1] not in [0, None]:
raise Exception(
f"Process {process.process.name} exited with code {process.check_alive()[1]}"
)
time.sleep(0.1)

# Join all processes before exiting
for process in processes:
p = multiprocessing.Process(target=process.run)
p.start()
jobs.append(p)

return jobs

process.join()


def terminate_all_processes():
Expand Down

0 comments on commit 9b99b4d

Please sign in to comment.