From 35961c31fe1a35bbe3a313f0fde91b6b48606430 Mon Sep 17 00:00:00 2001 From: Yuxing Fei Date: Sat, 27 Apr 2024 12:03:47 -0700 Subject: [PATCH 01/27] temp comit --- .../dashboard/routes/experiment.py | 2 +- alab_management/dashboard/routes/task.py | 2 +- alab_management/task_actor.py | 75 ++--- alab_management/task_manager/task_manager.py | 302 ++---------------- alab_management/task_view/task_enums.py | 10 +- alab_management/task_view/task_view.py | 100 +++--- alab_management/utils/data_objects.py | 31 +- 7 files changed, 130 insertions(+), 392 deletions(-) diff --git a/alab_management/dashboard/routes/experiment.py b/alab_management/dashboard/routes/experiment.py index ae94ac97..694d1cc0 100644 --- a/alab_management/dashboard/routes/experiment.py +++ b/alab_management/dashboard/routes/experiment.py @@ -188,7 +188,7 @@ def cancel_experiment(exp_id: str): # tasks = experiment_view.get_experiment(exp_id)["tasks"] for task in tasks: - task_view.mark_task_as_cancelling(task["task_id"]) + task_view.mark_task_as_canceling(task["task_id"]) except Exception as e: return {"status": "error", "reason": e.args[0]}, 400 else: diff --git a/alab_management/dashboard/routes/task.py b/alab_management/dashboard/routes/task.py index 562b5149..a9b26754 100644 --- a/alab_management/dashboard/routes/task.py +++ b/alab_management/dashboard/routes/task.py @@ -12,7 +12,7 @@ def cancel_task(task_id: str): """API to cancel a task.""" try: task_id_obj: ObjectId = ObjectId(task_id) - task_view.mark_task_as_cancelling(task_id_obj) + task_view.mark_task_as_canceling(task_id_obj) return {"status": "success"} except Exception as exception: diff --git a/alab_management/task_actor.py b/alab_management/task_actor.py index 830cf0d6..8e1fb3ef 100644 --- a/alab_management/task_actor.py +++ b/alab_management/task_actor.py @@ -36,7 +36,7 @@ class ParameterError(Exception): max_retries=0, time_limit=30 * 24 * 60 * 60 * 1000, notify_shutdown=True, -) # TODO time limit is set in ms. currently set to 30 days +) # time limit is set in ms. currently set to 30 days def run_task(task_id_str: str): """Submit a task. In this system, each task is run in an independent process, which will try to acquire device and @@ -129,62 +129,45 @@ def run_task(task_id_str: str): # from Alab_one, for eg: Powder dosing. Powder dosing class will have a method "run". result = task.run() except Abort: - task_view.update_status(task_id=task_id, status=TaskStatus.CANCELLED) + # task_view.update_status(task_id=task_id, status=TaskStatus.CANCELLED) task_view.set_message( task_id=task_id, message="Task was cancelled due to the abort signal" ) # display exception on the dashboard - logger.system_log( - level="ERROR", - log_data={ - "logged_by": "TaskActor", - "type": "TaskEnd", - "task_id": task_id, - "task_type": task_type.__name__, - "status": TaskStatus.CANCELLED.name, - "traceback": "Task was cancelled due to the abort signal", - }, - ) - lab_view.request_cleanup() - except Shutdown: - task_view.update_status(task_id=task_id, status=TaskStatus.STOPPED) - task_view.set_message( - task_id=task_id, message="Task was cancelled due to the worker shutdown" - ) # display exception on the dashboard - logger.system_log( - level="ERROR", - log_data={ - "logged_by": "TaskActor", - "type": "TaskEnd", - "task_id": task_id, - "task_type": task_type.__name__, - "status": TaskStatus.STOPPED.name, - "traceback": "Task was cancelled due to the worker shutdown", - }, - ) - lab_view.request_cleanup() + # logger.system_log( + # level="ERROR", + # log_data={ + # "logged_by": "TaskActor", + # "type": "TaskEnd", + # "task_id": task_id, + # "task_type": task_type.__name__, + # "status": TaskStatus.CANCELLED.name, + # "traceback": "Task was cancelled due to the abort signal", + # }, + # ) + # lab_view.request_cleanup() except Exception: - task_view.update_status(task_id=task_id, status=TaskStatus.ERROR) + # task_view.update_status(task_id=task_id, status=TaskStatus.ERROR) formatted_exception = format_exc() task_view.set_message( task_id=task_id, message=formatted_exception ) # display exception on the dashboard - logger.system_log( - level="ERROR", - log_data={ - "logged_by": "TaskActor", - "type": "TaskEnd", - "task_id": task_id, - "task_type": task_type.__name__, - "status": "ERROR", - "traceback": formatted_exception, - }, - ) - lab_view.request_cleanup() + # logger.system_log( + # level="ERROR", + # log_data={ + # "logged_by": "TaskActor", + # "type": "TaskEnd", + # "task_id": task_id, + # "task_type": task_type.__name__, + # "status": "ERROR", + # "traceback": formatted_exception, + # }, + # ) + # lab_view.request_cleanup() raise else: - task_view.update_status(task_id=task_id, status=TaskStatus.COMPLETED) + # task_view.update_status(task_id=task_id, status=TaskStatus.COMPLETED) if result is None: - pass + ... elif isinstance(result, dict): for key, value in result.items(): # we do this per item to avoid overwriting existing results. Its possible that some results were diff --git a/alab_management/task_manager/task_manager.py b/alab_management/task_manager/task_manager.py index 832b27dc..375bc722 100644 --- a/alab_management/task_manager/task_manager.py +++ b/alab_management/task_manager/task_manager.py @@ -5,97 +5,27 @@ import time from datetime import datetime -from functools import partial -from math import inf -from threading import Thread from typing import Any, cast import dill -import networkx as nx from bson import ObjectId from dramatiq_abort import abort -from alab_management.device_view import BaseDevice, get_all_devices from alab_management.device_view.device_view import DeviceView from alab_management.lab_view import LabView -from alab_management.logger import DBLogger, LoggingLevel +from alab_management.logger import DBLogger from alab_management.sample_view.sample import SamplePosition from alab_management.sample_view.sample_view import SamplePositionRequest, SampleView from alab_management.task_actor import run_task -from alab_management.task_view import TaskPriority, TaskView -from alab_management.task_view.task import BaseTask -from alab_management.task_view.task_enums import TaskStatus -from alab_management.utils.data_objects import get_collection -from alab_management.utils.module_ops import load_definition - -from .enums import _EXTRA_REQUEST -from .resource_requester import ( +from alab_management.task_manager.enums import _EXTRA_REQUEST +from alab_management.task_manager.resource_requester import ( RequestMixin, RequestStatus, ) - - -def parse_reroute_tasks() -> dict[str, type[BaseTask]]: - """ - Takes the reroute task registry and expands the supported sample positions (which is given in format similar to - resource requests) to the individual sample positions. - - Raises - ------ - ValueError: if the supported_sample_positions is not provided in the correct format. - - Returns - ------- - _type_: _description_ - """ - from alab_management.sample_view import SampleView - from alab_management.task_view.task import _reroute_task_registry - - # return [] - - load_definition() - - routes: dict[str, BaseTask] = {} # sample_position: Task - sample_view = SampleView() - - for reroute in _reroute_task_registry: - route_task = partial(reroute["task"], **reroute["kwargs"]) - supported_sample_positions = reroute["supported_sample_positions"] - - for device_identifier, positions in supported_sample_positions.items(): - if device_identifier is None: - devices = [None] - elif isinstance(device_identifier, str): - devices = [device_identifier] # name of particular device - elif issubclass(device_identifier, BaseDevice): - devices = [ - name - for name, device_instance in get_all_devices().items() - if isinstance(device_instance, device_identifier) - ] # all devices of this type - else: - raise ValueError( - "device must be a name of a specific device, a class of type BaseDevice, or None" - ) - - if isinstance(positions, str): - positions = [positions] # noqa: PLW2901 - for device in devices: - for position in positions: - if device is None and position == "": - raise ValueError( - 'Cannot have device=None and position="" -- this would return every sample_position!' - ) - if device is not None: - position = f"{device}{SamplePosition.SEPARATOR}{position}" # noqa: PLW2901 - for found_position in sample_view._sample_positions_collection.find( - {"name": {"$regex": position}} - ): # DB_ACCESS_OUTSIDE_VIEW - routes[found_position["name"]] = route_task - return routes - - -_reroute_registry = parse_reroute_tasks() +from alab_management.task_view import TaskView +from alab_management.task_view.task_enums import TaskStatus +from alab_management.utils.data_objects import get_collection +from alab_management.utils.module_ops import load_definition class TaskManager(RequestMixin): @@ -108,13 +38,10 @@ class TaskManager(RequestMixin): def __init__(self): load_definition() - self.task_view = TaskView() + self.task_view = TaskView(allow_update_status=True) self.sample_view = SampleView() self.device_view = DeviceView() self._request_collection = get_collection("requests") - self.__reroute_in_progress = False - - self.__skip_checking_task_id = False self.logger = DBLogger(task_id=None) super().__init__() @@ -124,16 +51,13 @@ def run(self): """Start the loop.""" while True: self._loop() - time.sleep(2) + time.sleep(1) def _loop(self): - self.submit_ready_tasks() + self.handle_tasks_to_be_canceled() self.handle_released_resources() - self.handle_tasks_to_be_cancelled() self.handle_requested_resources() - - if not self.__reroute_in_progress: - self.handle_request_cycles() + self.submit_ready_tasks() def _clean_up_tasks_from_previous_runs(self): """Cleans up incomplete tasks that exist from the last time the taskmanager was running. Note that this will @@ -145,7 +69,6 @@ def _clean_up_tasks_from_previous_runs(self): """ statuses_to_cancel = [ TaskStatus.RUNNING, - TaskStatus.CANCELLING, TaskStatus.REQUESTING_RESOURCES, ] tasks_to_cancel = [] @@ -215,22 +138,15 @@ def submit_ready_tasks(self): self.task_view.update_status( task_id=task_entry["task_id"], status=TaskStatus.INITIATED ) - result = run_task.send(task_id_str=str(task_entry["task_id"])) + result = run_task.send_with_options(kwargs={"task_id_str": str(task_entry["task_id"])}) message_id = result.message_id self.task_view.set_task_actor_id( task_id=task_entry["task_id"], message_id=message_id ) - def handle_tasks_to_be_cancelled(self): - """ - Check if there are any tasks that are in CANCELLING status. If so, cancel them. - - This is done by sending a dramatiq abort message to the task actor process. The task actor process will - update the status from CANCELLING to CANCELLED. - """ - tasks_to_be_cancelled = self.task_view.get_tasks_by_status( - status=TaskStatus.CANCELLING - ) + def handle_tasks_to_be_canceled(self): + """Check if there are any tasks needs to be stopped.""" + tasks_to_be_cancelled = self.task_view.get_tasks_to_be_canceled() for task_entry in tasks_to_be_cancelled: self.logger.system_log( @@ -242,15 +158,14 @@ def handle_tasks_to_be_cancelled(self): "task_actor_id": task_entry.get("task_actor_id", None), }, ) - message_id = task_entry.get("task_actor_id", None) - if message_id is not None: + if (message_id := task_entry.get("task_actor_id", None)) is not None: abort(message_id=message_id) - # updating the status from CANCELLING to CANCELLED will be executed in task actor process - else: - self.task_view.update_status( - task_id=task_entry["task_id"], - status=TaskStatus.CANCELLED, - ) + + # even if the task is not running, we will mark it as cancelled + self.task_view.update_status( + task_id=task_entry["task_id"], + status=TaskStatus.CANCELLED, + ) def handle_released_resources(self): """Release the resources.""" @@ -275,39 +190,20 @@ def handle_requested_resources(self): for request in requests: self._handle_requested_resources(request) - def handle_request_cycles(self): - """Check for request cycles (gridlocks where a set of tasks require sample_positions occupied by one - another.). We attempt to resolve these cycles by moving a sample out of the way by a reroute_task defined in - the alab configuration. This will move samples to free up the blocked task of highest priority. If this alone - does not resolve the cycle, we will try again on the next call to this method. - """ - positions_to_reroute, taskid_to_reroute = self._check_for_request_cycle() - if len(positions_to_reroute) > 0: - thread = Thread( - target=self._reroute_to_fix_request_cycle, - kwargs={ - "task_id": taskid_to_reroute, - "sample_positions": positions_to_reroute, - }, - ) - thread.daemon = False - thread.start() - def _handle_requested_resources(self, request_entry: dict[str, Any]): try: resource_request = request_entry["request"] task_id = request_entry["task_id"] - if not self.__skip_checking_task_id: - task_status = self.task_view.get_status(task_id=task_id) - if task_status != TaskStatus.REQUESTING_RESOURCES: - # this implies the Task has been cancelled or errored somewhere else in the chain -- we should - # not allocate any resources to the broken Task. - self.update_request_status( - request_id=resource_request["_id"], - status=RequestStatus.CANCELED, - ) - return + task_status = self.task_view.get_status(task_id=task_id) + if task_status != TaskStatus.REQUESTING_RESOURCES: + # this implies the Task has been cancelled or errored somewhere else in the chain -- we should + # not allocate any resources to the broken Task. + self.update_request_status( + request_id=resource_request["_id"], + status=RequestStatus.CANCELED, + ) + return devices = self.device_view.request_devices( task_id=task_id, @@ -402,7 +298,7 @@ def _occupy_devices(self, devices: dict[str, dict[str, Any]], task_id: ObjectId) ) def _occupy_sample_positions( - self, sample_positions: dict[str, list[dict[str, Any]]], task_id: ObjectId + self, sample_positions: dict[str, list[dict[str, Any]]], task_id: ObjectId ): for sample_positions_ in sample_positions.values(): for sample_position_ in sample_positions_: @@ -416,141 +312,9 @@ def _release_devices(self, devices: dict[str, dict[str, Any]]): self.device_view.release_device(device["name"]) def _release_sample_positions( - self, sample_positions: dict[str, list[dict[str, Any]]] + self, sample_positions: dict[str, list[dict[str, Any]]] ): for sample_positions_ in sample_positions.values(): for sample_position in sample_positions_: if sample_position["need_release"]: self.sample_view.release_sample_position(sample_position["name"]) - - def _check_for_request_cycle(self): - """Check if there is a cycle in the request graph. (ie tasks occupy sample positions required by one another, - no requests can be fulfilled). If found, use a reroute task to fix the cycle. This function will only trigger - if a reroute task has been defined using `add_reroute`. - """ - tasks = self.task_view.get_tasks_by_status(TaskStatus.REQUESTING_RESOURCES) - - if len(tasks) < 2: - return [], None # no cycle to fix - - # get occupied and requested positions per task that is currently requesting resources - occupied_by_task = {} - requested_by_task = {} - task_priority = {} - task_ids_to_consider = [] - for t in tasks: - request = self._request_collection.find_one( - {"task_id": t["task_id"], "status": RequestStatus.PENDING.name} - ) # DB_ACCESS_OUTSIDE_VIEW - if request is None: - # race condition. task must have had resource request fulfilled between getting task and - # request entries. - continue - if "parsed_sample_positions_request" not in request: - # slight delay between setting TaskStatus.REQUESTING_RESOURCES and generating - # parsed_sample_positions_request. can catch these on the next call if necessary. - continue - task_ids_to_consider.append(t["task_id"]) - occupied = occupied_by_task[t["task_id"]] = [] - blocked = requested_by_task[t["task_id"]] = [] - task_priority[t["task_id"]] = request["priority"] - for s in t["samples"]: - occupied.append(self.sample_view.get_sample(s["sample_id"]).position) - for r in request["parsed_sample_positions_request"]: - if ( - len( - self.sample_view.get_available_sample_position( - task_id=t["task_id"], position_prefix=r["prefix"] - ) - ) - < r["number"] - ): - blocked.append( - r["prefix"] - ) # we dont have enough available positions for this request - - # construct a directed graph where nodes are task_id's, and edges indicate that the tail node is blocked by - # the head node (ie the tail task is requesting a sample_position occupied by the head task) - edges = [] - for i, t0 in enumerate(task_ids_to_consider): - for j, t1 in enumerate(task_ids_to_consider): - if i == j: - continue - if any( - occupied in requested_by_task[t0] - for occupied in occupied_by_task[t1] - ): - edges.append((t0, t1)) - - if len(edges) < 2: - return [], None # no cycle without at least two edges - g = nx.DiGraph(edges) - try: - cycle = nx.find_cycle( - g - ) # a cycle indicates a set of tasks that are blocking one another - except nx.NetworkXNoCycle: - return [], None # no cycle to fix - - # get the highest priority task in the cycle. We will unblock this task. - highest_priority = -inf - for _blocking_taskid, _occupying_taskid in cycle: - priority = task_priority[_blocking_taskid] - if priority > highest_priority: - highest_priority = priority - occupying_taskid = _occupying_taskid - blocked = requested_by_task[_blocking_taskid] - occupied = occupied_by_task[_occupying_taskid] - positions_to_vacate = [p for p in occupied if p in blocked] - - return positions_to_vacate, occupying_taskid - - def _reroute_to_fix_request_cycle( - self, - task_id: ObjectId, - sample_positions: list[str], - ): - from alab_management.lab_view import LabView - - """ - Runs rerouting tasks (as specified by add_reroute_task) to vacate sample_positions to resolve a request cycle. - - task_id: the task_id of the blocking task that will be rerouted sample_positions: sample_positions occupied - by the blocking task which will be moved by the appropriate reroute task.""" - - self.__reroute_in_progress = True - lab_view = LabView(task_id=task_id) - for fix_position in sample_positions: - if fix_position not in _reroute_registry: - raise ValueError( - f'No reroute task defined to move sample out from sample_position "{fix_position}". Please add a ' - f"reroute task using `add_reroute`" - ) - reroute_Task: BaseTask = _reroute_registry[fix_position] - - sample_to_move = self.sample_view._sample_collection.find_one( - {"position": fix_position} - ) # DB_ACCESS_OUTSIDE_VIEW - lab_view.logger.system_log( - level=LoggingLevel.INFO, - log_data={ - "logged_by": "TaskManager", - "type": "Reroute", - "reroute_task": { - "task_type": reroute_Task.func.__name__, - "kwargs": reroute_Task.keywords, - }, - "reroute_target": { - "task_id": task_id, - "sample_id": sample_to_move["_id"], - "sample_position": fix_position, - }, - }, - ) - reroute_Task( - task_id=task_id, - lab_view=lab_view, - priority=TaskPriority.HIGH, - sample=sample_to_move["_id"], - ).run() - self.__reroute_in_progress = False diff --git a/alab_management/task_view/task_enums.py b/alab_management/task_view/task_enums.py index 32d94f49..d90eb559 100644 --- a/alab_management/task_view/task_enums.py +++ b/alab_management/task_view/task_enums.py @@ -20,26 +20,20 @@ class TaskStatus(Enum): The status of one task. - ``WAITING``: the task cannot start yet, waiting for preceding tasks to finish - - ``INITIATED``: the task has been sent to task actor, but not yet running - - ``PAUSED``: the task is paused by user - - ``STOPPED``: the task has been stopped due to the system shutdown - ``READY``: the task is ready to submit + - ``INITIATED``: the task has been sent to task actor, but not yet running - ``REQUESTING_RESOURCES``: the task is requesting resources - ``RUNNING``: the task is currently running - ``ERROR``: the task encountered some errors during execution - ``COMPLETED``: the task is completed - - ``CANCELLING``: a cancelling request has been submitted - ``CANCELLED``: the task has been cancelled and stopped. """ WAITING = auto() - INITIATED = auto() - PAUSED = auto() - STOPPED = auto() READY = auto() + INITIATED = auto() REQUESTING_RESOURCES = auto() RUNNING = auto() ERROR = auto() COMPLETED = auto() - CANCELLING = auto() CANCELLED = auto() diff --git a/alab_management/task_view/task_view.py b/alab_management/task_view/task_view.py index 94393840..6b6695ef 100644 --- a/alab_management/task_view/task_view.py +++ b/alab_management/task_view/task_view.py @@ -8,11 +8,10 @@ from bson import ObjectId +from alab_management.task_view.completed_task_view import CompletedTaskView from alab_management.task_view.task import BaseTask, get_all_tasks from alab_management.task_view.task_enums import TaskStatus -from alab_management.utils.data_objects import get_collection, get_lock, make_bsonable - -from .completed_task_view import CompletedTaskView +from alab_management.utils.data_objects import get_collection, make_bsonable completed_task_view = CompletedTaskView() @@ -20,19 +19,19 @@ class TaskView: """Task view manages the status, parameters of a task.""" - def __init__(self): + def __init__(self, allow_update_status: bool = False): self._task_collection = get_collection("tasks") - self._lock = get_lock("tasks") self._tasks_definition: dict[str, type[BaseTask]] = get_all_tasks() + self._allow_update_status = allow_update_status def create_task( - self, - task_type: str, - samples: list[ObjectId], - parameters: dict[str, Any], - prev_tasks: ObjectId | list[ObjectId] | None = None, - next_tasks: ObjectId | list[ObjectId] | None = None, - task_id: ObjectId | None = None, + self, + task_type: str, + samples: list[ObjectId], + parameters: dict[str, Any], + prev_tasks: ObjectId | list[ObjectId] | None = None, + next_tasks: ObjectId | list[ObjectId] | None = None, + task_id: ObjectId | None = None, ) -> ObjectId: """ Insert a task into the task collection. @@ -81,7 +80,7 @@ def create_task( return cast(ObjectId, result.inserted_id) def create_subtask( - self, task_id, subtask_type, samples: list[str], parameters: dict + self, task_id, subtask_type, samples: list[str], parameters: dict ): """Create a subtask entry for a task.""" task = self.get_task(task_id=task_id) @@ -137,7 +136,7 @@ def get_task(self, task_id: ObjectId, encode: bool = False) -> dict[str, Any]: result = self.encode_task(result) return result - def get_task_with_sample(self, sample_id: ObjectId) -> dict[str, Any] | None: + def get_task_with_sample(self, sample_id: ObjectId) -> list[dict[str, Any]] | None: """Get a task that contains the sample with the provided id.""" result = self._task_collection.find({"samples.sample_id": sample_id}) if result is None: @@ -163,6 +162,8 @@ def update_status(self, task_id: ObjectId, status: TaskStatus): task_id: the id of task to be updated status: the new status of the task """ + self._check_allow_update_status() + task = self.get_task(task_id=task_id, encode=False) update_dict = { @@ -230,7 +231,7 @@ def update_status(self, task_id: ObjectId, status: TaskStatus): ) # in case it was only waiting on task we just cancelled def update_subtask_status( - self, task_id: ObjectId, subtask_id: ObjectId, status: TaskStatus + self, task_id: ObjectId, subtask_id: ObjectId, status: TaskStatus ): """Update the status of a subtask.""" task = self.get_task(task_id=task_id, encode=False) @@ -257,7 +258,7 @@ def update_subtask_status( ) def update_result( - self, task_id: ObjectId, name: str | None = None, value: Any = None + self, task_id: ObjectId, name: str | None = None, value: Any = None ): """ Update result to completed job. @@ -286,7 +287,7 @@ def update_result( ) def update_subtask_result( - self, task_id: ObjectId, subtask_id: ObjectId, result: Any + self, task_id: ObjectId, subtask_id: ObjectId, result: Any ): """ Update result of completed subtask within task job. @@ -326,12 +327,14 @@ def try_to_mark_task_ready(self, task_id: ObjectId): Check if one task's parent tasks are all completed, if so, mark it as READY. """ + self._check_allow_update_status() + task = self.get_task(task_id) prev_task_ids = task["prev_tasks"] if task["status"] == TaskStatus.WAITING.name and all( - self.get_status(task_id=task_id_) is TaskStatus.COMPLETED - for task_id_ in prev_task_ids + self.get_status(task_id=task_id_) is TaskStatus.COMPLETED + for task_id_ in prev_task_ids ): self.update_status(task_id, TaskStatus.READY) @@ -377,10 +380,10 @@ def encode_task(self, task_entry: dict[str, Any]) -> dict[str, Any]: } def update_task_dependency( - self, - task_id: ObjectId, - prev_tasks: ObjectId | list[ObjectId] | None = None, - next_tasks: ObjectId | list[ObjectId] | None = None, + self, + task_id: ObjectId, + prev_tasks: ObjectId | list[ObjectId] | None = None, + next_tasks: ObjectId | list[ObjectId] | None = None, ): """ Add prev tasks and next tasks to one task entry, @@ -449,31 +452,34 @@ def set_task_actor_id(self, task_id: ObjectId, message_id: str): }, ) - def mark_task_as_cancelling(self, task_id: ObjectId): - """ - Try to cancel a task by marking the task as TaskStatus.CANCELLING. - - If the status is not in [READY, INITIATED, WAITING, PAUSED, READY, RUNNING], - the request will be ignored and returned. + def mark_task_as_canceling(self, task_id: ObjectId) -> bool: + """Try to cancel a task by setting the field "stopping" to True.""" + entry = self._task_collection.find_one_and_update( + {"_id": task_id, "status": {"$in": [ + TaskStatus.RUNNING.name, TaskStatus.REQUESTING_RESOURCES.name], + }}, + { + "$set": { + "canceling": True, + "last_updated": datetime.now(), + } + } + ) + return entry is not None - The task manager will handle it. - """ - current_status = self.get_status(task_id=task_id) - - if current_status in [ - TaskStatus.READY, - TaskStatus.INITIATED, - TaskStatus.WAITING, - TaskStatus.REQUESTING_RESOURCES, - TaskStatus.PAUSED, - TaskStatus.READY, - TaskStatus.RUNNING, - ]: - self.update_status( - task_id=ObjectId(task_id), - status=TaskStatus.CANCELLING, - ) + def get_tasks_to_be_canceled(self) -> list[dict[str, Any]]: + """Get a list of tasks that are in the process of being canceled.""" + result = self._task_collection.find({"canceling": True, "status": {"$in": [ + TaskStatus.RUNNING.name, TaskStatus.REQUESTING_RESOURCES.name], + }}) + return [self.encode_task(task) for task in result] def exists(self, task_id: ObjectId | str) -> bool: """Check if a task id exists.""" - return self._task_collection.count_documents({"_id": ObjectId(task_id)}) > 0 + return self._task_collection.find_one({"_id": ObjectId(task_id)}) is not None + + def _check_allow_update_status(self): + if not self._allow_update_status: + raise PermissionError( + "This method is not allowed to be called when `allow_update_status` is False" + ) diff --git a/alab_management/utils/data_objects.py b/alab_management/utils/data_objects.py index d4f8e386..cafa223c 100644 --- a/alab_management/utils/data_objects.py +++ b/alab_management/utils/data_objects.py @@ -13,11 +13,14 @@ from pymongo import collection, database from alab_management.config import AlabOSConfig - -from .db_lock import MongoLock +from alab_management.utils.db_lock import MongoLock class _BaseGetMongoCollection(ABC): + client: pymongo.MongoClient | None = None + db: database.Database | None = None + db_lock: MongoLock | None = None + @classmethod @abstractmethod def init(cls): @@ -39,13 +42,8 @@ def get_lock(cls, name: str) -> MongoLock: class _GetMongoCollection(_BaseGetMongoCollection): - client: pymongo.MongoClient | None = None - db: database.Database | None = None - db_lock: MongoLock | None = None - @classmethod def init(cls): - db_config = AlabOSConfig()["mongodb"] cls.client = pymongo.MongoClient( host=db_config.get("host", None), @@ -54,27 +52,20 @@ def init(cls): password=db_config.get("password", ""), ) sim_mode_flag = AlabOSConfig().is_sim_mode() - if sim_mode_flag: - cls.db = cls.client[AlabOSConfig()["general"]["name"] + "_sim"] - else: - cls.db = cls.client[AlabOSConfig()["general"]["name"]] # type: ignore # pylint: disable=unsubscriptable-object + # force to enable sim mode, just in case + cls.db = cls.client[AlabOSConfig()["general"]["name"] + ("_sim" * sim_mode_flag)] class _GetCompletedMongoCollection(_BaseGetMongoCollection): - client: pymongo.MongoClient | None = None - db: database.Database | None = None - db_lock: MongoLock | None = None - @classmethod def init(cls): - - ALABOS_CONFIG = AlabOSConfig() - if "mongodb_completed" not in ALABOS_CONFIG: + alabos_config = AlabOSConfig() + if "mongodb_completed" not in alabos_config: raise ValueError( "Cannot use the completed database feature until that database info is set. Please specify the " "mongodb_completed configuration in the config file!" ) - db_config = ALABOS_CONFIG["mongodb_completed"] + db_config = alabos_config["mongodb_completed"] cls.client = pymongo.MongoClient( host=db_config.get("host", None), port=db_config.get("port", None), @@ -85,7 +76,7 @@ def init(cls): if sim_mode_flag: cls.db = cls.client[ AlabOSConfig()["general"]["name"] + "(completed)" + "_sim" - ] + ] else: cls.db = cls.client[AlabOSConfig()["general"]["name"] + "(completed)"] # type: ignore # pylint: disable=unsubscriptable-object From c5c3426e12ee666f4587a23a064c2fa32f509133 Mon Sep 17 00:00:00 2001 From: Yuxing Fei Date: Mon, 29 Apr 2024 22:19:52 -0700 Subject: [PATCH 02/27] finish modification of task mgr --- alab_management/task_actor.py | 67 ++++++++++---------- alab_management/task_manager/task_manager.py | 2 +- alab_management/task_view/task_enums.py | 2 + alab_management/task_view/task_view.py | 12 +--- 4 files changed, 36 insertions(+), 47 deletions(-) diff --git a/alab_management/task_actor.py b/alab_management/task_actor.py index 8e1fb3ef..ad2994e2 100644 --- a/alab_management/task_actor.py +++ b/alab_management/task_actor.py @@ -9,7 +9,6 @@ import dramatiq from bson import ObjectId from dramatiq import get_broker -from dramatiq.middleware import Shutdown from dramatiq_abort import Abort, Abortable, backends from alab_management.logger import DBLogger @@ -28,10 +27,6 @@ """ -class ParameterError(Exception): - """The exception raised when parameters of a task is wrong.""" - - @dramatiq.actor( max_retries=0, time_limit=30 * 24 * 60 * 60 * 1000, @@ -107,7 +102,6 @@ def run_task(task_id_str: str): raise Exception( f"Failed to create task {task_id} of type {task_type!s}" ) from exception - # raise ParameterError(exception.args[0]) from exception try: task_view.update_status(task_id=task_id, status=TaskStatus.RUNNING) @@ -129,45 +123,47 @@ def run_task(task_id_str: str): # from Alab_one, for eg: Powder dosing. Powder dosing class will have a method "run". result = task.run() except Abort: - # task_view.update_status(task_id=task_id, status=TaskStatus.CANCELLED) + task_status = TaskStatus.CANCELLED + task_view.update_status(task_id=task_id, status=TaskStatus.FINISHING) task_view.set_message( task_id=task_id, message="Task was cancelled due to the abort signal" ) # display exception on the dashboard - # logger.system_log( - # level="ERROR", - # log_data={ - # "logged_by": "TaskActor", - # "type": "TaskEnd", - # "task_id": task_id, - # "task_type": task_type.__name__, - # "status": TaskStatus.CANCELLED.name, - # "traceback": "Task was cancelled due to the abort signal", - # }, - # ) - # lab_view.request_cleanup() + logger.system_log( + level="ERROR", + log_data={ + "logged_by": "TaskActor", + "type": "TaskEnd", + "task_id": task_id, + "task_type": task_type.__name__, + "status": TaskStatus.CANCELLED.name, + "traceback": "Task was cancelled due to the abort signal", + }, + ) + lab_view.request_cleanup() except Exception: - # task_view.update_status(task_id=task_id, status=TaskStatus.ERROR) + task_status = TaskStatus.ERROR + task_view.update_status(task_id=task_id, status=TaskStatus.FINISHING) formatted_exception = format_exc() task_view.set_message( task_id=task_id, message=formatted_exception ) # display exception on the dashboard - # logger.system_log( - # level="ERROR", - # log_data={ - # "logged_by": "TaskActor", - # "type": "TaskEnd", - # "task_id": task_id, - # "task_type": task_type.__name__, - # "status": "ERROR", - # "traceback": formatted_exception, - # }, - # ) - # lab_view.request_cleanup() - raise + logger.system_log( + level="ERROR", + log_data={ + "logged_by": "TaskActor", + "type": "TaskEnd", + "task_id": task_id, + "task_type": task_type.__name__, + "status": "ERROR", + "traceback": formatted_exception, + }, + ) + lab_view.request_cleanup() else: - # task_view.update_status(task_id=task_id, status=TaskStatus.COMPLETED) + task_status = TaskStatus.COMPLETED + task_view.update_status(task_id=task_id, status=TaskStatus.FINISHING) if result is None: - ... + pass elif isinstance(result, dict): for key, value in result.items(): # we do this per item to avoid overwriting existing results. Its possible that some results were @@ -193,3 +189,4 @@ def run_task(task_id_str: str): sample_view.update_sample_task_id( task_id=None, sample_id=sample["sample_id"] ) + task_view.update_status(task_id=task_id, status=task_status) diff --git a/alab_management/task_manager/task_manager.py b/alab_management/task_manager/task_manager.py index 375bc722..b19e5b8f 100644 --- a/alab_management/task_manager/task_manager.py +++ b/alab_management/task_manager/task_manager.py @@ -38,7 +38,7 @@ class TaskManager(RequestMixin): def __init__(self): load_definition() - self.task_view = TaskView(allow_update_status=True) + self.task_view = TaskView() self.sample_view = SampleView() self.device_view = DeviceView() self._request_collection = get_collection("requests") diff --git a/alab_management/task_view/task_enums.py b/alab_management/task_view/task_enums.py index d90eb559..966a8812 100644 --- a/alab_management/task_view/task_enums.py +++ b/alab_management/task_view/task_enums.py @@ -24,6 +24,7 @@ class TaskStatus(Enum): - ``INITIATED``: the task has been sent to task actor, but not yet running - ``REQUESTING_RESOURCES``: the task is requesting resources - ``RUNNING``: the task is currently running + - ``FINISHING``: the task is finishing up, but not yet completed/errored/cancelled - ``ERROR``: the task encountered some errors during execution - ``COMPLETED``: the task is completed - ``CANCELLED``: the task has been cancelled and stopped. @@ -34,6 +35,7 @@ class TaskStatus(Enum): INITIATED = auto() REQUESTING_RESOURCES = auto() RUNNING = auto() + FINISHING = auto() ERROR = auto() COMPLETED = auto() CANCELLED = auto() diff --git a/alab_management/task_view/task_view.py b/alab_management/task_view/task_view.py index 7169860c..152a93c0 100644 --- a/alab_management/task_view/task_view.py +++ b/alab_management/task_view/task_view.py @@ -19,10 +19,9 @@ class TaskView: """Task view manages the status, parameters of a task.""" - def __init__(self, allow_update_status: bool = False): + def __init__(self): self._task_collection = get_collection("tasks") self._tasks_definition: dict[str, type[BaseTask]] = get_all_tasks() - self._allow_update_status = allow_update_status def create_task( self, @@ -161,8 +160,6 @@ def update_status(self, task_id: ObjectId, status: TaskStatus): task_id: the id of task to be updated status: the new status of the task """ - self._check_allow_update_status() - task = self.get_task(task_id=task_id, encode=False) update_dict = { "status": status.name, @@ -322,8 +319,6 @@ def try_to_mark_task_ready(self, task_id: ObjectId): Check if one task's parent tasks are all completed, if so, mark it as READY. """ - self._check_allow_update_status() - task = self.get_task(task_id) prev_task_ids = task["prev_tasks"] @@ -472,8 +467,3 @@ def exists(self, task_id: ObjectId | str) -> bool: """Check if a task id exists.""" return self._task_collection.find_one({"_id": ObjectId(task_id)}) is not None - def _check_allow_update_status(self): - if not self._allow_update_status: - raise PermissionError( - "This method is not allowed to be called when `allow_update_status` is False" - ) From 5c3eccb5df4c367166f50808e3601bf40ae734a0 Mon Sep 17 00:00:00 2001 From: Yuxing Fei Date: Mon, 29 Apr 2024 23:40:50 -0700 Subject: [PATCH 03/27] a workable example --- alab_management/utils/middleware.py | 0 pytest.ini | 4 ---- 2 files changed, 4 deletions(-) create mode 100644 alab_management/utils/middleware.py delete mode 100644 pytest.ini diff --git a/alab_management/utils/middleware.py b/alab_management/utils/middleware.py new file mode 100644 index 00000000..e69de29b diff --git a/pytest.ini b/pytest.ini deleted file mode 100644 index 7d6f49c7..00000000 --- a/pytest.ini +++ /dev/null @@ -1,4 +0,0 @@ -[pytest] -testpaths = ./tests -env = - ALABOS_CONFIG_PATH = ./tests/fake_lab/config.toml From c82cf23f6733d4e8f5319fce67c12596c2312f44 Mon Sep 17 00:00:00 2001 From: Yuxing Fei Date: Tue, 30 Apr 2024 00:12:14 -0700 Subject: [PATCH 04/27] a workable example --- alab_management/resource_manager/__init__.py | 0 alab_management/{task_manager => resource_manager}/enums.py | 0 alab_management/resource_manager/resource_manager.py | 0 .../{task_manager => resource_manager}/resource_requester.py | 0 4 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 alab_management/resource_manager/__init__.py rename alab_management/{task_manager => resource_manager}/enums.py (100%) create mode 100644 alab_management/resource_manager/resource_manager.py rename alab_management/{task_manager => resource_manager}/resource_requester.py (100%) diff --git a/alab_management/resource_manager/__init__.py b/alab_management/resource_manager/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/alab_management/task_manager/enums.py b/alab_management/resource_manager/enums.py similarity index 100% rename from alab_management/task_manager/enums.py rename to alab_management/resource_manager/enums.py diff --git a/alab_management/resource_manager/resource_manager.py b/alab_management/resource_manager/resource_manager.py new file mode 100644 index 00000000..e69de29b diff --git a/alab_management/task_manager/resource_requester.py b/alab_management/resource_manager/resource_requester.py similarity index 100% rename from alab_management/task_manager/resource_requester.py rename to alab_management/resource_manager/resource_requester.py From d4d1d35c478c78b68106fddc50a4aa7835808c38 Mon Sep 17 00:00:00 2001 From: Yuxing Fei Date: Tue, 30 Apr 2024 00:18:05 -0700 Subject: [PATCH 05/27] a workable example --- alab_management/experiment_manager.py | 3 +- .../experiment_view/experiment_view.py | 7 +- alab_management/lab_view.py | 2 +- alab_management/resource_manager/enums.py | 2 +- .../resource_manager/resource_manager.py | 207 ++++++++++++++++++ .../resource_manager/resource_requester.py | 73 +++--- alab_management/scripts/launch_lab.py | 12 + alab_management/task_actor.py | 13 +- alab_management/task_manager/task_manager.py | 176 +-------------- alab_management/task_view/task_view.py | 9 +- alab_management/user_input.py | 2 - alab_management/utils/middleware.py | 11 + pyproject.toml | 32 +-- tests/test_lab_view.py | 8 +- tests/test_launch.py | 49 +++-- tests/test_task_manager.py | 17 +- 16 files changed, 335 insertions(+), 288 deletions(-) diff --git a/alab_management/experiment_manager.py b/alab_management/experiment_manager.py index db62123d..3d80472b 100644 --- a/alab_management/experiment_manager.py +++ b/alab_management/experiment_manager.py @@ -47,7 +47,7 @@ def run(self): ) while True: self._loop() - time.sleep(1) + # time.sleep() def _loop(self): self.handle_pending_experiments() @@ -163,7 +163,6 @@ def mark_completed_experiments(self): TaskStatus.COMPLETED, TaskStatus.ERROR, TaskStatus.CANCELLED, - TaskStatus.STOPPED, } for task_id in task_ids ): diff --git a/alab_management/experiment_view/experiment_view.py b/alab_management/experiment_view/experiment_view.py index c2f98193..e7c08671 100644 --- a/alab_management/experiment_view/experiment_view.py +++ b/alab_management/experiment_view/experiment_view.py @@ -13,8 +13,6 @@ from .completed_experiment_view import CompletedExperimentView from .experiment import InputExperiment -completed_experiment_view = CompletedExperimentView() - class ExperimentStatus(Enum): """ @@ -39,6 +37,7 @@ def __init__(self): self._experiment_collection = get_collection("experiment") self.sample_view = SampleView() self.task_view = TaskView() + self.completed_experiment_view = CompletedExperimentView() def create_experiment(self, experiment: InputExperiment) -> ObjectId: """ @@ -103,7 +102,7 @@ def get_experiment(self, exp_id: ObjectId) -> dict[str, Any] | None: experiment = self._experiment_collection.find_one({"_id": exp_id}) if experiment is None: try: - experiment = completed_experiment_view.get_experiment( + experiment = self.completed_experiment_view.get_experiment( experiment_id=exp_id ) except ValueError: @@ -167,7 +166,7 @@ def get_experiment_by_task_id(self, task_id: ObjectId) -> dict[str, Any] | None: experiment = self._experiment_collection.find_one({"tasks.task_id": task_id}) if experiment is None: raise ValueError(f"Cannot find experiment containing task_id: {task_id}") - return experiment + return dict(experiment) def get_experiment_by_sample_id(self, sample_id: ObjectId) -> dict[str, Any] | None: """Get an experiment that contains a sample with the given sample_id.""" diff --git a/alab_management/lab_view.py b/alab_management/lab_view.py index 808a007f..e83d98c7 100644 --- a/alab_management/lab_view.py +++ b/alab_management/lab_view.py @@ -20,7 +20,7 @@ from alab_management.logger import DBLogger from alab_management.sample_view.sample import Sample from alab_management.sample_view.sample_view import SamplePositionRequest, SampleView -from alab_management.task_manager.resource_requester import ResourceRequester +from alab_management.resource_manager.resource_requester import ResourceRequester from alab_management.task_view.task import BaseTask from alab_management.task_view.task_enums import TaskPriority, TaskStatus from alab_management.task_view.task_view import TaskView diff --git a/alab_management/resource_manager/enums.py b/alab_management/resource_manager/enums.py index b117f13f..b9d55315 100644 --- a/alab_management/resource_manager/enums.py +++ b/alab_management/resource_manager/enums.py @@ -1,4 +1,4 @@ -from enum import Enum, auto +from enum import auto, Enum _EXTRA_REQUEST: str = "__nodevice" diff --git a/alab_management/resource_manager/resource_manager.py b/alab_management/resource_manager/resource_manager.py index e69de29b..c8b46488 100644 --- a/alab_management/resource_manager/resource_manager.py +++ b/alab_management/resource_manager/resource_manager.py @@ -0,0 +1,207 @@ +""" +TaskLauncher is the core module of the system, +which actually executes the tasks. +""" + +import time +from datetime import datetime +from typing import Any, cast + +import dill +from bson import ObjectId + +from alab_management.device_view.device_view import DeviceView +from alab_management.logger import DBLogger +from alab_management.resource_manager.enums import _EXTRA_REQUEST +from alab_management.resource_manager.resource_requester import ( + RequestMixin, + RequestStatus, +) +from alab_management.sample_view.sample import SamplePosition +from alab_management.sample_view.sample_view import SamplePositionRequest, SampleView +from alab_management.task_view import TaskView +from alab_management.task_view.task_enums import TaskStatus +from alab_management.utils.data_objects import get_collection +from alab_management.utils.module_ops import load_definition + + +class ResourceManager(RequestMixin): + """ + TaskManager will. + + (1) find all the ready tasks and submit them, + (2) handle all the resource requests + """ + + def __init__(self): + load_definition() + self.task_view = TaskView() + self.sample_view = SampleView() + self.device_view = DeviceView() + self._request_collection = get_collection("requests") + + self.logger = DBLogger(task_id=None) + super().__init__() + time.sleep(1) # allow some time for other modules to launch + + def run(self): + """Start the loop.""" + while True: + self._loop() + # time.sleep(1) + + def _loop(self): + self.handle_released_resources() + self.handle_requested_resources() + + def handle_released_resources(self): + """Release the resources.""" + for request in self.get_requests_by_status(RequestStatus.NEED_RELEASE): + devices = request["assigned_devices"] + sample_positions = request["assigned_sample_positions"] + self._release_devices(devices) + self._release_sample_positions(sample_positions) + self.update_request_status( + request_id=request["_id"], status=RequestStatus.RELEASED + ) + + def handle_requested_resources(self): + """ + Check if there are any requests that are in PENDING status. If so, + try to assign the resources to it. + """ + requests = list(self.get_requests_by_status(RequestStatus.PENDING)) + # prioritize the oldest requests at the highest priority value + requests.sort(key=lambda x: x["submitted_at"]) + requests.sort(key=lambda x: x["priority"], reverse=True) + for request in requests: + self._handle_requested_resources(request) + + def _handle_requested_resources(self, request_entry: dict[str, Any]): + try: + resource_request = request_entry["request"] + task_id = request_entry["task_id"] + + task_status = self.task_view.get_status(task_id=task_id) + if task_status != TaskStatus.REQUESTING_RESOURCES: + # this implies the Task has been cancelled or errored somewhere else in the chain -- we should + # not allocate any resources to the broken Task. + self.update_request_status( + request_id=resource_request["_id"], + status=RequestStatus.CANCELED, + ) + return + + devices = self.device_view.request_devices( + task_id=task_id, + device_names_str=[ + entry["device"]["content"] + for entry in resource_request + if entry["device"]["identifier"] == "name" + ], + device_types_str=[ + entry["device"]["content"] + for entry in resource_request + if entry["device"]["identifier"] == "type" + ], + ) + # some devices are not available now + # the request cannot be fulfilled + if devices is None: + return + + # replace device placeholder in sample position request + # and make it into a single list + parsed_sample_positions_request = [] + for request in resource_request: + if request["device"]["identifier"] == _EXTRA_REQUEST: + device_prefix = "" + else: + device_name = devices[request["device"]["content"]]["name"] + device_prefix = f"{device_name}{SamplePosition.SEPARATOR}" + + for pos in request["sample_positions"]: + prefix = pos["prefix"] + # if this is a nested resource request, lets not prepend the device name twice. + if not prefix.startswith(device_prefix): + prefix = device_prefix + prefix + parsed_sample_positions_request.append( + SamplePositionRequest(prefix=prefix, number=pos["number"]) + ) + + self._request_collection.update_one( + {"_id": request_entry["_id"]}, + { + "$set": { + "parsed_sample_positions_request": [ + dict(spr) for spr in parsed_sample_positions_request + ] + } + }, + ) + sample_positions = self.sample_view.request_sample_positions( + task_id=task_id, sample_positions=parsed_sample_positions_request + ) + if sample_positions is None: + return + + # in case some errors happen, we will raise the error in the task process instead of the main process + except Exception as error: # pylint: disable=broad-except + self._request_collection.update_one( + {"_id": request_entry["_id"]}, + { + "$set": { + "status": RequestStatus.ERROR.name, + "error": dill.dumps(error), + "assigned_devices": None, + "assigned_sample_positions": None, + } + }, + ) + return + + # if both devices and sample positions can be satisfied + self._request_collection.update_one( + {"_id": request_entry["_id"]}, + { + "$set": { + "assigned_devices": devices, + "assigned_sample_positions": sample_positions, + "status": RequestStatus.FULFILLED.name, + "fulfilled_at": datetime.now(), + } + }, + ) + # label the resources as occupied + self._occupy_devices(devices=devices, task_id=task_id) + self._occupy_sample_positions( + sample_positions=sample_positions, task_id=task_id + ) + + def _occupy_devices(self, devices: dict[str, dict[str, Any]], task_id: ObjectId): + for device in devices.values(): + self.device_view.occupy_device( + device=cast(str, device["name"]), task_id=task_id + ) + + def _occupy_sample_positions( + self, sample_positions: dict[str, list[dict[str, Any]]], task_id: ObjectId + ): + for sample_positions_ in sample_positions.values(): + for sample_position_ in sample_positions_: + self.sample_view.lock_sample_position( + task_id, cast(str, sample_position_["name"]) + ) + + def _release_devices(self, devices: dict[str, dict[str, Any]]): + for device in devices.values(): + if device["need_release"]: + self.device_view.release_device(device["name"]) + + def _release_sample_positions( + self, sample_positions: dict[str, list[dict[str, Any]]] + ): + for sample_positions_ in sample_positions.values(): + for sample_position in sample_positions_: + if sample_position["need_release"]: + self.sample_view.release_sample_position(sample_position["name"]) diff --git a/alab_management/resource_manager/resource_requester.py b/alab_management/resource_manager/resource_requester.py index a285f512..06f1decb 100644 --- a/alab_management/resource_manager/resource_requester.py +++ b/alab_management/resource_manager/resource_requester.py @@ -17,13 +17,12 @@ from alab_management.device_view.device import BaseDevice from alab_management.device_view.device_view import DeviceView +from alab_management.resource_manager.enums import _EXTRA_REQUEST, RequestStatus from alab_management.sample_view.sample import SamplePosition from alab_management.sample_view.sample_view import SamplePositionRequest from alab_management.task_view import TaskPriority from alab_management.utils.data_objects import get_collection -from .enums import _EXTRA_REQUEST, RequestStatus - _SampleRequestDict = dict[str, int] _ResourceRequestDict = dict[ type[BaseDevice] | str | None, _SampleRequestDict @@ -99,7 +98,7 @@ def update_request_status(self, request_id: ObjectId, status: RequestStatus): ) # wait for the request to be updated while ( - self.get_request(request_id, projection=["status"])["status"] != status.name + self.get_request(request_id, projection=["status"])["status"] != status.name ): time.sleep(0.5) return value_returned @@ -130,8 +129,8 @@ class ResourceRequester(RequestMixin): """ def __init__( - self, - task_id: ObjectId, + self, + task_id: ObjectId, ): self._request_collection = get_collection("requests") self._waiting: dict[ObjectId, dict[str, Any]] = {} @@ -157,10 +156,10 @@ def __close__(self): __del__ = __close__ def request_resources( - self, - resource_request: _ResourceRequestDict, - timeout: float | None = None, - priority: TaskPriority | int | None = None, + self, + resource_request: _ResourceRequestDict, + timeout: float | None = None, + priority: TaskPriority | int | None = None, ) -> dict[str, Any]: """ Request lab resources. @@ -235,11 +234,11 @@ def request_resources( result = f.result(timeout=None) except ( - TimeoutError + TimeoutError ): # cancel the task if timeout, make sure it is not fulfilled if ( - self.get_request(_id, projection=["status"])["status"] - != RequestStatus.FULFILLED.name + self.get_request(_id, projection=["status"])["status"] + != RequestStatus.FULFILLED.name ): self.update_request_status( request_id=_id, status=RequestStatus.CANCELED @@ -268,7 +267,7 @@ def release_resources(self, request_id: ObjectId): request = self.get_request(request_id) if request["status"] in [RequestStatus.CANCELED.name, RequestStatus.ERROR.name]: if ("assigned_devices" in request) or ( - "assigned_sample_positions" in request + "assigned_sample_positions" in request ): self.update_request_status(request_id, RequestStatus.NEED_RELEASE) # wait for the request to be updated to NEED_RELEASE or have been released @@ -295,8 +294,8 @@ def release_resources(self, request_id: ObjectId): ) # wait for the request to be updated to NEED_RELEASE or have been released while ( - self.get_request(request_id, projection=["status"])["status"] - == RequestStatus.FULFILLED.name + self.get_request(request_id, projection=["status"])["status"] + == RequestStatus.FULFILLED.name ): time.sleep(0.5) @@ -338,8 +337,8 @@ def release_all_resources(self): RequestStatus.CANCELED.name, RequestStatus.ERROR.name, ] and ( - ("assigned_devices" in request) - or ("assigned_sample_positions" in request) + ("assigned_devices" in request) + or ("assigned_sample_positions" in request) ): self.update_request_status(request["_id"], RequestStatus.NEED_RELEASE) assigned_cancel_error_requests_id.append(request["_id"]) @@ -358,32 +357,32 @@ def release_all_resources(self): ) # wait for all the requests to be updated while any( - request["status"] - in [RequestStatus.FULFILLED.name, RequestStatus.PENDING.name] - for request in self.get_requests_by_task_id(self.task_id) + request["status"] + in [RequestStatus.FULFILLED.name, RequestStatus.PENDING.name] + for request in self.get_requests_by_task_id(self.task_id) ): time.sleep(0.5) if assigned_cancel_error_requests_id: # wait for the requests to be updated to updated to NEED_RELEASE or have been released while any( - request["status"] - in [RequestStatus.CANCELED.name, RequestStatus.ERROR.name] - for request in self.get_requests_by_task_id(self.task_id) - if request["_id"] in assigned_cancel_error_requests_id + request["status"] + in [RequestStatus.CANCELED.name, RequestStatus.ERROR.name] + for request in self.get_requests_by_task_id(self.task_id) + if request["_id"] in assigned_cancel_error_requests_id ): time.sleep(0.5) # wait for all the requests to be released or canceled or errored during the release while any( - ( - request["status"] - not in [ - RequestStatus.RELEASED.name, - RequestStatus.CANCELED.name, - RequestStatus.ERROR.name, - ] - ) - for request in self.get_requests_by_task_id(self.task_id) + ( + request["status"] + not in [ + RequestStatus.RELEASED.name, + RequestStatus.CANCELED.name, + RequestStatus.ERROR.name, + ] + ) + for request in self.get_requests_by_task_id(self.task_id) ): time.sleep(0.5) @@ -457,9 +456,9 @@ def _handle_canceled_request(self, request_id: ObjectId): @staticmethod def _post_process_requested_resource( - devices: dict[type[BaseDevice], str], - sample_positions: dict[str, list[str]], - resource_request: dict[str, list[dict[str, int | str]]], + devices: dict[type[BaseDevice], str], + sample_positions: dict[str, list[str]], + resource_request: dict[str, list[dict[str, int | str]]], ): processed_sample_positions: dict[ type[BaseDevice] | None, dict[str, list[str]] @@ -478,7 +477,7 @@ def _post_process_requested_resource( f"{devices[device_request]}{SamplePosition.SEPARATOR}" ) if not reply_prefix.startswith( - device_prefix + device_prefix ): # dont extra prepend for nested requests reply_prefix = device_prefix + reply_prefix processed_sample_positions[device_request][prefix] = sample_positions[ diff --git a/alab_management/scripts/launch_lab.py b/alab_management/scripts/launch_lab.py index 1f36ea37..f77c1c10 100644 --- a/alab_management/scripts/launch_lab.py +++ b/alab_management/scripts/launch_lab.py @@ -57,6 +57,16 @@ def launch_device_manager(): device_manager.run() +def launch_resource_manager(): + """Launch the resource manager.""" + from alab_management.resource_manager.resource_manager import ResourceManager + from alab_management.utils.module_ops import load_definition + + load_definition() + resource_manager = ResourceManager() + resource_manager.run() + + def launch_lab(host, port, debug): """Start to run the lab.""" from alab_management.device_view import DeviceView @@ -73,6 +83,7 @@ def launch_lab(host, port, debug): experiment_manager_thread = Thread(target=launch_experiment_manager) task_launcher_thread = Thread(target=launch_task_manager) device_manager_thread = Thread(target=launch_device_manager) + resource_manager_thread = Thread(target=launch_resource_manager) dashboard_thread.daemon = experiment_manager_thread.daemon = ( task_launcher_thread.daemon @@ -82,6 +93,7 @@ def launch_lab(host, port, debug): device_manager_thread.start() experiment_manager_thread.start() task_launcher_thread.start() + resource_manager_thread.start() while True: time.sleep(1.5) diff --git a/alab_management/task_actor.py b/alab_management/task_actor.py index ad2994e2..cd211a26 100644 --- a/alab_management/task_actor.py +++ b/alab_management/task_actor.py @@ -8,24 +8,13 @@ import dramatiq from bson import ObjectId -from dramatiq import get_broker -from dramatiq_abort import Abort, Abortable, backends +from dramatiq_abort import Abort from alab_management.logger import DBLogger from alab_management.sample_view import SampleView from alab_management.task_view import BaseTask, TaskStatus, TaskView -from alab_management.utils.data_objects import get_collection from alab_management.utils.module_ops import load_definition -abortable = Abortable( - backend=backends.MongoDBBackend(collection=get_collection("abortable")) -) -get_broker().add_middleware(abortable) -""" -This allows the task to be aborted. -The abort signal is sent by the user, and the task will be aborted at the next checkpoint. -""" - @dramatiq.actor( max_retries=0, diff --git a/alab_management/task_manager/task_manager.py b/alab_management/task_manager/task_manager.py index b19e5b8f..7723f046 100644 --- a/alab_management/task_manager/task_manager.py +++ b/alab_management/task_manager/task_manager.py @@ -4,31 +4,19 @@ """ import time -from datetime import datetime -from typing import Any, cast -import dill -from bson import ObjectId from dramatiq_abort import abort -from alab_management.device_view.device_view import DeviceView from alab_management.lab_view import LabView from alab_management.logger import DBLogger -from alab_management.sample_view.sample import SamplePosition -from alab_management.sample_view.sample_view import SamplePositionRequest, SampleView from alab_management.task_actor import run_task -from alab_management.task_manager.enums import _EXTRA_REQUEST -from alab_management.task_manager.resource_requester import ( - RequestMixin, - RequestStatus, -) from alab_management.task_view import TaskView from alab_management.task_view.task_enums import TaskStatus -from alab_management.utils.data_objects import get_collection +from alab_management.utils.middleware import register_abortable_middleware from alab_management.utils.module_ops import load_definition -class TaskManager(RequestMixin): +class TaskManager: """ TaskManager will. @@ -38,10 +26,8 @@ class TaskManager(RequestMixin): def __init__(self): load_definition() + register_abortable_middleware() self.task_view = TaskView() - self.sample_view = SampleView() - self.device_view = DeviceView() - self._request_collection = get_collection("requests") self.logger = DBLogger(task_id=None) super().__init__() @@ -51,12 +37,10 @@ def run(self): """Start the loop.""" while True: self._loop() - time.sleep(1) + # time.sleep(1) def _loop(self): self.handle_tasks_to_be_canceled() - self.handle_released_resources() - self.handle_requested_resources() self.submit_ready_tasks() def _clean_up_tasks_from_previous_runs(self): @@ -166,155 +150,3 @@ def handle_tasks_to_be_canceled(self): task_id=task_entry["task_id"], status=TaskStatus.CANCELLED, ) - - def handle_released_resources(self): - """Release the resources.""" - for request in self.get_requests_by_status(RequestStatus.NEED_RELEASE): - devices = request["assigned_devices"] - sample_positions = request["assigned_sample_positions"] - self._release_devices(devices) - self._release_sample_positions(sample_positions) - self.update_request_status( - request_id=request["_id"], status=RequestStatus.RELEASED - ) - - def handle_requested_resources(self): - """ - Check if there are any requests that are in PENDING status. If so, - try to assign the resources to it. - """ - requests = list(self.get_requests_by_status(RequestStatus.PENDING)) - # prioritize the oldest requests at the highest priority value - requests.sort(key=lambda x: x["submitted_at"]) - requests.sort(key=lambda x: x["priority"], reverse=True) - for request in requests: - self._handle_requested_resources(request) - - def _handle_requested_resources(self, request_entry: dict[str, Any]): - try: - resource_request = request_entry["request"] - task_id = request_entry["task_id"] - - task_status = self.task_view.get_status(task_id=task_id) - if task_status != TaskStatus.REQUESTING_RESOURCES: - # this implies the Task has been cancelled or errored somewhere else in the chain -- we should - # not allocate any resources to the broken Task. - self.update_request_status( - request_id=resource_request["_id"], - status=RequestStatus.CANCELED, - ) - return - - devices = self.device_view.request_devices( - task_id=task_id, - device_names_str=[ - entry["device"]["content"] - for entry in resource_request - if entry["device"]["identifier"] == "name" - ], - device_types_str=[ - entry["device"]["content"] - for entry in resource_request - if entry["device"]["identifier"] == "type" - ], - ) - # some devices are not available now - # the request cannot be fulfilled - if devices is None: - return - - # replace device placeholder in sample position request - # and make it into a single list - parsed_sample_positions_request = [] - for request in resource_request: - if request["device"]["identifier"] == _EXTRA_REQUEST: - device_prefix = "" - else: - device_name = devices[request["device"]["content"]]["name"] - device_prefix = f"{device_name}{SamplePosition.SEPARATOR}" - - for pos in request["sample_positions"]: - prefix = pos["prefix"] - # if this is a nested resource request, lets not prepend the device name twice. - if not prefix.startswith(device_prefix): - prefix = device_prefix + prefix - parsed_sample_positions_request.append( - SamplePositionRequest(prefix=prefix, number=pos["number"]) - ) - - self._request_collection.update_one( - {"_id": request_entry["_id"]}, - { - "$set": { - "parsed_sample_positions_request": [ - dict(spr) for spr in parsed_sample_positions_request - ] - } - }, - ) - sample_positions = self.sample_view.request_sample_positions( - task_id=task_id, sample_positions=parsed_sample_positions_request - ) - if sample_positions is None: - return - - # in case some errors happen, we will raise the error in the task process instead of the main process - except Exception as error: # pylint: disable=broad-except - self._request_collection.update_one( - {"_id": request_entry["_id"]}, - { - "$set": { - "status": RequestStatus.ERROR.name, - "error": dill.dumps(error), - "assigned_devices": None, - "assigned_sample_positions": None, - } - }, - ) - return - - # if both devices and sample positions can be satisfied - self._request_collection.update_one( - {"_id": request_entry["_id"]}, - { - "$set": { - "assigned_devices": devices, - "assigned_sample_positions": sample_positions, - "status": RequestStatus.FULFILLED.name, - "fulfilled_at": datetime.now(), - } - }, - ) - # label the resources as occupied - self._occupy_devices(devices=devices, task_id=task_id) - self._occupy_sample_positions( - sample_positions=sample_positions, task_id=task_id - ) - - def _occupy_devices(self, devices: dict[str, dict[str, Any]], task_id: ObjectId): - for device in devices.values(): - self.device_view.occupy_device( - device=cast(str, device["name"]), task_id=task_id - ) - - def _occupy_sample_positions( - self, sample_positions: dict[str, list[dict[str, Any]]], task_id: ObjectId - ): - for sample_positions_ in sample_positions.values(): - for sample_position_ in sample_positions_: - self.sample_view.lock_sample_position( - task_id, cast(str, sample_position_["name"]) - ) - - def _release_devices(self, devices: dict[str, dict[str, Any]]): - for device in devices.values(): - if device["need_release"]: - self.device_view.release_device(device["name"]) - - def _release_sample_positions( - self, sample_positions: dict[str, list[dict[str, Any]]] - ): - for sample_positions_ in sample_positions.values(): - for sample_position in sample_positions_: - if sample_position["need_release"]: - self.sample_view.release_sample_position(sample_position["name"]) diff --git a/alab_management/task_view/task_view.py b/alab_management/task_view/task_view.py index 152a93c0..8145f923 100644 --- a/alab_management/task_view/task_view.py +++ b/alab_management/task_view/task_view.py @@ -8,13 +8,11 @@ from bson import ObjectId -from alab_management.task_view.completed_task_view import CompletedTaskView +from alab_management.task_view import CompletedTaskView from alab_management.task_view.task import BaseTask, get_all_tasks from alab_management.task_view.task_enums import TaskStatus from alab_management.utils.data_objects import get_collection, make_bsonable -completed_task_view = CompletedTaskView() - class TaskView: """Task view manages the status, parameters of a task.""" @@ -22,11 +20,12 @@ class TaskView: def __init__(self): self._task_collection = get_collection("tasks") self._tasks_definition: dict[str, type[BaseTask]] = get_all_tasks() + self.completed_task_view = CompletedTaskView() def create_task( self, task_type: str, - samples: list[ObjectId], + samples: list[dict[str, Any]], parameters: dict[str, Any], prev_tasks: ObjectId | list[ObjectId] | None = None, next_tasks: ObjectId | list[ObjectId] | None = None, @@ -123,7 +122,7 @@ def get_task(self, task_id: ObjectId, encode: bool = False) -> dict[str, Any]: if result is None: # try to get a completed task entry try: - result = completed_task_view.get_task(task_id=task_id) + result = self.completed_task_view.get_task(task_id=task_id) except ValueError: result = None # couldn't find it here either diff --git a/alab_management/user_input.py b/alab_management/user_input.py index 745a9a8e..b1bc4453 100644 --- a/alab_management/user_input.py +++ b/alab_management/user_input.py @@ -12,8 +12,6 @@ from .config import AlabOSConfig -CONFIG = AlabOSConfig() - class UserRequestStatus(Enum): """Enum for user response.""" diff --git a/alab_management/utils/middleware.py b/alab_management/utils/middleware.py index e69de29b..9ab90dca 100644 --- a/alab_management/utils/middleware.py +++ b/alab_management/utils/middleware.py @@ -0,0 +1,11 @@ +from dramatiq import get_broker +from dramatiq_abort import Abortable, backends + +from alab_management.utils.data_objects import get_collection + + +def register_abortable_middleware(): + abortable = Abortable( + backend=backends.MongoDBBackend(collection=get_collection("abortable")) + ) + get_broker().add_middleware(abortable) diff --git a/pyproject.toml b/pyproject.toml index 1d1f73b9..a3e66246 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,28 +50,6 @@ dependencies = [ "dramatiq-abort @ git+https://github.com/idocx/dramatiq-abort", "slack_sdk>=3.19.5", "Flask-Cors>=3.0.10", - "pytest >= 6.2.5", - "pytest_reraise >= 2.1.1", - "pylint >= 2.11.1", - "flake8 ~= 4.0.1", - "pytest-env ~= 0.6.2", - "requests >= 2.26.0", - "flake8-bugbear >= 21.11.29", - "flake8-docstrings >= 1.6.0", - "black", - "sphinx >= 3.2.1", - "sphinx_book_theme", - "recommonmark ~= 0.7.1", - "sphinx-autodoc-typehints >= 1.12.0", - "pytest >= 6.2.5", - "pytest_reraise >= 2.1.1", - "pylint >= 2.11.1", - "flake8 ~= 4.0.1", - "pytest-env ~= 0.6.2", - "requests >= 2.26.0", - "flake8-bugbear >= 21.11.29", - "flake8-docstrings >= 1.6.0", - "ruff" ] [project.optional-dependencies] @@ -98,7 +76,11 @@ dev = [ "requests >= 2.26.0", "flake8-bugbear >= 21.11.29", "flake8-docstrings >= 1.6.0", - "pandas-stubs >= 2.1.1.230928" + "pandas-stubs >= 2.1.1.230928", + "requests >= 2.26.0", + "flake8-bugbear >= 21.11.29", + "flake8-docstrings >= 1.6.0", + "ruff" ] tests = ["pytest-cov==4.1.0", "pytest==7.4.1", "moto==4.2.2"] vis = ["matplotlib", "pydot"] @@ -192,3 +174,7 @@ line-length = 150 "**/tests/*" = ["D"] "**/fake_lab/*" = ["D", "B"] "docs/source/conf.py" = ["D"] + +[tool.pytest.ini_options] +testpaths = "./tests" +env = "ALABOS_CONFIG_PATH = ./tests/fake_lab/config.toml" diff --git a/tests/test_lab_view.py b/tests/test_lab_view.py index f302678c..b0e17671 100644 --- a/tests/test_lab_view.py +++ b/tests/test_lab_view.py @@ -9,6 +9,7 @@ from alab_management.lab_view import LabView from alab_management.sample_view import SampleView from alab_management.scripts.cleanup_lab import cleanup_lab +from alab_management.scripts.launch_lab import launch_resource_manager from alab_management.scripts.setup_lab import setup_lab from alab_management.task_manager.task_manager import TaskManager from alab_management.task_view import TaskView @@ -37,7 +38,7 @@ def setUp(self) -> None: self.device_list = self.device_view._device_list self.sample_view = SampleView() self.task_view = TaskView() - self.process = Process(target=launch_task_manager) + self.process = Process(target=launch_resource_manager) self.process.daemon = True self.process.start() time.sleep(1) @@ -77,7 +78,8 @@ def test_request_resources(self): None: { "furnace_table": 1, }, - } + }, + timeout=1, ) as (devices, sample_positions): self.assertDictEqual( { @@ -120,6 +122,6 @@ def test_request_resources_empty(self): ) lab_view = LabView(task_id=task_id) - with lab_view.request_resources({}) as (devices, sample_positions): + with lab_view.request_resources({}, timeout=1) as (devices, sample_positions): self.assertDictEqual({}, devices) self.assertEqual({}, sample_positions) diff --git a/tests/test_launch.py b/tests/test_launch.py index 5f8138f2..f89e48f5 100644 --- a/tests/test_launch.py +++ b/tests/test_launch.py @@ -1,4 +1,3 @@ -import datetime import subprocess import time import unittest @@ -27,10 +26,15 @@ def setUp(self) -> None: self.experiment_view = ExperimentView() self.main_process = subprocess.Popen(["alabos", "launch", "--port", "8896"]) self.worker_process = subprocess.Popen( - ["alabos", "launch_worker", "--processes", "4", "--threads", "1"] + ["alabos", "launch_worker", "--processes", "8", "--threads", "16"] ) time.sleep(3) # waiting for starting up + if self.main_process.poll() is not None: + raise RuntimeError("Main process failed to start") + if self.worker_process.poll() is not None: + raise RuntimeError("Worker process failed to start") + def tearDown(self) -> None: self.main_process.terminate() self.worker_process.terminate() @@ -82,24 +86,25 @@ def test_submit_experiment(self): exp_id = ObjectId(resp_json["data"]["exp_id"]) self.assertTrue("success", resp_json["status"]) exp_ids.append(exp_id) - time.sleep(3) - # self.assertEqual(9, self.task_view._task_collection.count_documents({})) - # print(list(self.task_view._task_collection.find({}))) - print(datetime.datetime.now()) - # self.assertTrue( - # all( - # task["status"] == "COMPLETED" - # for task in self.task_view._task_collection.find() - # ) - # ) - # self.assertTrue( - # all( - # task["result"] == task["_id"] - # for task in self.task_view._task_collection.find() - # ) - # ) + time.sleep(30) + self.assertEqual(9, self.task_view._task_collection.count_documents({})) + import rich + rich.print(list(self.task_view._task_collection.find({}))) + # print(datetime.datetime.now()) + self.assertTrue( + all( + task["status"] == "COMPLETED" + for task in self.task_view._task_collection.find() + ) + ) + self.assertTrue( + all( + task["result"] == task["_id"] + for task in self.task_view._task_collection.find() + ) + ) - # for exp_id in exp_ids: - # self.assertEqual( - # "COMPLETED", self.experiment_view.get_experiment(exp_id)["status"] - # ) + for exp_id in exp_ids: + self.assertEqual( + "COMPLETED", self.experiment_view.get_experiment(exp_id)["status"] + ) diff --git a/tests/test_task_manager.py b/tests/test_task_manager.py index bb54071c..68b2971a 100644 --- a/tests/test_task_manager.py +++ b/tests/test_task_manager.py @@ -7,12 +7,14 @@ from alab_management.device_view import DeviceTaskStatus, DeviceView from alab_management.device_view.device import get_all_devices +from alab_management.resource_manager.resource_requester import ResourceRequester from alab_management.sample_view import SampleView from alab_management.sample_view.sample_view import SamplePositionStatus from alab_management.scripts.cleanup_lab import cleanup_lab +from alab_management.scripts.launch_lab import launch_resource_manager from alab_management.scripts.setup_lab import setup_lab -from alab_management.task_manager.resource_requester import ResourceRequester from alab_management.task_manager.task_manager import TaskManager +from alab_management.task_view import TaskView def launch_task_manager(): @@ -25,7 +27,7 @@ def launch_task_manager(): raise -class TestTaskManager(unittest.TestCase): +class TestResourceManager(unittest.TestCase): def setUp(self) -> None: time.sleep(0.5) cleanup_lab( @@ -39,8 +41,15 @@ def setUp(self) -> None: self.devices = get_all_devices() self.device_view = DeviceView() self.sample_view = SampleView() - self.resource_requester = ResourceRequester(task_id=ObjectId()) - self.process = Process(target=launch_task_manager) + self.task_view = TaskView() + fake_task = self.task_view._task_collection.insert_one( + { + "type": "fake_task", + "status": "REQUESTING_RESOURCES", + } + ) + self.resource_requester = ResourceRequester(task_id=fake_task.inserted_id) + self.process = Process(target=launch_resource_manager) self.process.daemon = True self.process.start() time.sleep(0.5) From 2ca582111b92234ffb9a126c56b308de64fce826 Mon Sep 17 00:00:00 2001 From: Yuxing Fei Date: Tue, 30 Apr 2024 00:18:16 -0700 Subject: [PATCH 06/27] a workable example --- tests/test_task_manager.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_task_manager.py b/tests/test_task_manager.py index 68b2971a..7b584292 100644 --- a/tests/test_task_manager.py +++ b/tests/test_task_manager.py @@ -3,8 +3,6 @@ from multiprocessing import Process from traceback import print_exc -from bson import ObjectId - from alab_management.device_view import DeviceTaskStatus, DeviceView from alab_management.device_view.device import get_all_devices from alab_management.resource_manager.resource_requester import ResourceRequester From d27b5592e068d4cfa6f80b6fc533afa692141c58 Mon Sep 17 00:00:00 2001 From: Yuxing Fei Date: Tue, 30 Apr 2024 17:50:42 -0700 Subject: [PATCH 07/27] a workable example --- alab_management/resource_manager/enums.py | 2 +- alab_management/task_manager/task_manager.py | 3 +- alab_management/utils/middleware.py | 33 ++++++++++++++++++++ 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/alab_management/resource_manager/enums.py b/alab_management/resource_manager/enums.py index b9d55315..b117f13f 100644 --- a/alab_management/resource_manager/enums.py +++ b/alab_management/resource_manager/enums.py @@ -1,4 +1,4 @@ -from enum import auto, Enum +from enum import Enum, auto _EXTRA_REQUEST: str = "__nodevice" diff --git a/alab_management/task_manager/task_manager.py b/alab_management/task_manager/task_manager.py index 7723f046..f7a7fd41 100644 --- a/alab_management/task_manager/task_manager.py +++ b/alab_management/task_manager/task_manager.py @@ -12,7 +12,7 @@ from alab_management.task_actor import run_task from alab_management.task_view import TaskView from alab_management.task_view.task_enums import TaskStatus -from alab_management.utils.middleware import register_abortable_middleware +from alab_management.utils.middleware import patch_dramatiq, register_abortable_middleware from alab_management.utils.module_ops import load_definition @@ -27,6 +27,7 @@ class TaskManager: def __init__(self): load_definition() register_abortable_middleware() + patch_dramatiq() self.task_view = TaskView() self.logger = DBLogger(task_id=None) diff --git a/alab_management/utils/middleware.py b/alab_management/utils/middleware.py index 9ab90dca..75cdfb05 100644 --- a/alab_management/utils/middleware.py +++ b/alab_management/utils/middleware.py @@ -1,9 +1,42 @@ +import dramatiq from dramatiq import get_broker +from dramatiq.brokers.rabbitmq import RabbitmqBroker +from dramatiq.middleware.age_limit import AgeLimit +from dramatiq.middleware.callbacks import Callbacks +from dramatiq.middleware.pipelines import Pipelines +from dramatiq.middleware.prometheus import Prometheus +from dramatiq.middleware.retries import Retries +from dramatiq.middleware.shutdown import ShutdownNotifications +from dramatiq.middleware.time_limit import TimeLimit from dramatiq_abort import Abortable, backends from alab_management.utils.data_objects import get_collection +def patch_dramatiq(): + class PatchedPrometheus(Prometheus): + @property + def forks(self): + return [] + + broker_middleware = [ + PatchedPrometheus, AgeLimit, TimeLimit, + ShutdownNotifications, Callbacks, Pipelines, Retries + ] + + broker_middleware = [m() for m in broker_middleware] + + rabbitmq_broker = RabbitmqBroker( + host="127.0.0.1", + port=5672, + heartbeat=60, + connection_attempts=5, + blocked_connection_timeout=30, + middleware=broker_middleware + ) + dramatiq.set_broker(rabbitmq_broker) + + def register_abortable_middleware(): abortable = Abortable( backend=backends.MongoDBBackend(collection=get_collection("abortable")) From f9147bde62b292a238c5a8986304187ec3753346 Mon Sep 17 00:00:00 2001 From: Yuxing Fei Date: Tue, 30 Apr 2024 20:50:47 -0700 Subject: [PATCH 08/27] temp not working --- alab_management/scripts/launch_worker.py | 2 +- alab_management/task_actor.py | 3 +++ alab_management/task_manager/task_manager.py | 6 ++---- tests/test_launch.py | 8 +++++--- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/alab_management/scripts/launch_worker.py b/alab_management/scripts/launch_worker.py index 1f4b81f4..e27f5796 100644 --- a/alab_management/scripts/launch_worker.py +++ b/alab_management/scripts/launch_worker.py @@ -11,7 +11,7 @@ def launch_worker(args): from dramatiq.cli import make_argument_parser # Clean up any leftover tasks from previous runs. This blocks new workers until cleanup is done! - TaskManager()._clean_up_tasks_from_previous_runs() # pylint: disable=protected-access + TaskManager().clean_up_tasks_from_previous_runs() # pylint: disable=protected-access args = make_argument_parser().parse_args( args=["alab_management.task_actor", *args], diff --git a/alab_management/task_actor.py b/alab_management/task_actor.py index cd211a26..f66c2e57 100644 --- a/alab_management/task_actor.py +++ b/alab_management/task_actor.py @@ -13,8 +13,11 @@ from alab_management.logger import DBLogger from alab_management.sample_view import SampleView from alab_management.task_view import BaseTask, TaskStatus, TaskView +from alab_management.utils.middleware import register_abortable_middleware from alab_management.utils.module_ops import load_definition +register_abortable_middleware() + @dramatiq.actor( max_retries=0, diff --git a/alab_management/task_manager/task_manager.py b/alab_management/task_manager/task_manager.py index f7a7fd41..7c75658c 100644 --- a/alab_management/task_manager/task_manager.py +++ b/alab_management/task_manager/task_manager.py @@ -12,7 +12,7 @@ from alab_management.task_actor import run_task from alab_management.task_view import TaskView from alab_management.task_view.task_enums import TaskStatus -from alab_management.utils.middleware import patch_dramatiq, register_abortable_middleware +from alab_management.utils.middleware import register_abortable_middleware from alab_management.utils.module_ops import load_definition @@ -26,8 +26,6 @@ class TaskManager: def __init__(self): load_definition() - register_abortable_middleware() - patch_dramatiq() self.task_view = TaskView() self.logger = DBLogger(task_id=None) @@ -44,7 +42,7 @@ def _loop(self): self.handle_tasks_to_be_canceled() self.submit_ready_tasks() - def _clean_up_tasks_from_previous_runs(self): + def clean_up_tasks_from_previous_runs(self): """Cleans up incomplete tasks that exist from the last time the taskmanager was running. Note that this will block the task queue until all samples in these tasks have been removed from the physical lab (confirmed via user requests on the dashboard). diff --git a/tests/test_launch.py b/tests/test_launch.py index f89e48f5..465f6041 100644 --- a/tests/test_launch.py +++ b/tests/test_launch.py @@ -1,4 +1,5 @@ import subprocess +import sys import time import unittest @@ -24,9 +25,10 @@ def setUp(self) -> None: setup_lab() self.task_view = TaskView() self.experiment_view = ExperimentView() - self.main_process = subprocess.Popen(["alabos", "launch", "--port", "8896"]) + self.main_process = subprocess.Popen(["alabos", "launch", "--port", "8896"], stdout=sys.stdout, + stderr=sys.stderr) self.worker_process = subprocess.Popen( - ["alabos", "launch_worker", "--processes", "8", "--threads", "16"] + ["alabos", "launch_worker", "--processes", "8", "--threads", "1"], stdout=sys.stdout, stderr=sys.stderr ) time.sleep(3) # waiting for starting up @@ -86,7 +88,7 @@ def test_submit_experiment(self): exp_id = ObjectId(resp_json["data"]["exp_id"]) self.assertTrue("success", resp_json["status"]) exp_ids.append(exp_id) - time.sleep(30) + time.sleep(60) self.assertEqual(9, self.task_view._task_collection.count_documents({})) import rich rich.print(list(self.task_view._task_collection.find({}))) From 523aa55b27334953b8b38f5616f1ede23880db12 Mon Sep 17 00:00:00 2001 From: Yuxing Fei Date: Thu, 2 May 2024 15:23:39 -0700 Subject: [PATCH 09/27] commit from time to time --- tests/fake_lab/devices/device_that_fails.py | 0 tests/fake_lab/devices/device_that_never_ends.py | 0 tests/fake_lab/devices/device_that_run_slow.py | 0 tests/fake_lab/devices/larger_furnace.py | 0 tests/fake_lab/tasks/error_handling_task.py | 0 tests/fake_lab/tasks/infinite_task.py | 14 ++++++++++++++ tests/fake_lab/tasks/test_threading_task.py | 0 7 files changed, 14 insertions(+) create mode 100644 tests/fake_lab/devices/device_that_fails.py create mode 100644 tests/fake_lab/devices/device_that_never_ends.py create mode 100644 tests/fake_lab/devices/device_that_run_slow.py create mode 100644 tests/fake_lab/devices/larger_furnace.py create mode 100644 tests/fake_lab/tasks/error_handling_task.py create mode 100644 tests/fake_lab/tasks/infinite_task.py create mode 100644 tests/fake_lab/tasks/test_threading_task.py diff --git a/tests/fake_lab/devices/device_that_fails.py b/tests/fake_lab/devices/device_that_fails.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/fake_lab/devices/device_that_never_ends.py b/tests/fake_lab/devices/device_that_never_ends.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/fake_lab/devices/device_that_run_slow.py b/tests/fake_lab/devices/device_that_run_slow.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/fake_lab/devices/larger_furnace.py b/tests/fake_lab/devices/larger_furnace.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/fake_lab/tasks/error_handling_task.py b/tests/fake_lab/tasks/error_handling_task.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/fake_lab/tasks/infinite_task.py b/tests/fake_lab/tasks/infinite_task.py new file mode 100644 index 00000000..2465287d --- /dev/null +++ b/tests/fake_lab/tasks/infinite_task.py @@ -0,0 +1,14 @@ +from bson import ObjectId +from ..devices.device_that_never_ends import DeviceThatNeverEnds + +from alab_management.task_view.task import BaseTask + + +class ErrorHandling(BaseTask): + def __init__(self, samples: list[ObjectId], *args, **kwargs): + super().__init__(samples=samples, *args, **kwargs) + self.sample = samples[0] + + def run(self): + with self.lab_view.request_resources({DeviceThatNeverEnds: {"infinite_loop": 1}}) as (device_that_fails, _): + device_that_fails.run_infinite() diff --git a/tests/fake_lab/tasks/test_threading_task.py b/tests/fake_lab/tasks/test_threading_task.py new file mode 100644 index 00000000..e69de29b From 3a0185389fd0ecd194c06cb3d4b3607fdabe3dc8 Mon Sep 17 00:00:00 2001 From: Yuxing Fei Date: Thu, 2 May 2024 15:57:43 -0700 Subject: [PATCH 10/27] update --- alab_management/experiment_manager.py | 7 +- alab_management/scripts/launch_worker.py | 13 ++- alab_management/task_actor.py | 3 - alab_management/task_manager/task_manager.py | 1 + alab_management/utils/data_objects.py | 2 +- tests/fake_lab/__init__.py | 15 ++- tests/fake_lab/devices/device_that_fails.py | 32 ++++++ .../devices/device_that_never_ends.py | 33 +++++++ .../fake_lab/devices/device_that_run_slow.py | 34 +++++++ tests/fake_lab/devices/furnace.py | 3 +- tests/fake_lab/devices/larger_furnace.py | 47 +++++++++ tests/fake_lab/tasks/error_handling_task.py | 14 +++ tests/fake_lab/tasks/infinite_task.py | 2 +- tests/fake_lab/tasks/test_threading_task.py | 14 +++ tests/test_launch.py | 97 ++++++++++++++++--- 15 files changed, 289 insertions(+), 28 deletions(-) diff --git a/alab_management/experiment_manager.py b/alab_management/experiment_manager.py index 3d80472b..0bca7560 100644 --- a/alab_management/experiment_manager.py +++ b/alab_management/experiment_manager.py @@ -92,10 +92,9 @@ def _handle_pending_experiment(self, experiment: dict[str, Any]): }, ) if task_graph.has_cycle(): - raise ValueError( - "Detect cycle in task graph, which is supposed " - "to be a DAG (directed acyclic graph)." - ) + self.experiment_view.update_experiment_status(experiment["_id"], ExperimentStatus.ERROR) + print(f"Experiment ({experiment['_id']}) has a cycle in the graph.") + return # create samples in the sample database sample_ids = { diff --git a/alab_management/scripts/launch_worker.py b/alab_management/scripts/launch_worker.py index e27f5796..83104095 100644 --- a/alab_management/scripts/launch_worker.py +++ b/alab_management/scripts/launch_worker.py @@ -1,4 +1,5 @@ """Launch Dramatiq worker to submit tasks.""" +import os from alab_management.task_manager.task_manager import TaskManager @@ -17,5 +18,15 @@ def launch_worker(args): args=["alab_management.task_actor", *args], namespace=Namespace(processes=6, threads=128), ) - launch(args=args) + + lock_file = os.path.expanduser("~/.alabos_worker_lock") + if os.path.exists(lock_file): + raise RuntimeError("Worker lock file exists. Another worker is already running.") + + with open(lock_file, "w") as f: + f.write(str(os.getpid())) + try: + launch(args=args) + finally: + os.remove(lock_file) return True diff --git a/alab_management/task_actor.py b/alab_management/task_actor.py index f66c2e57..cd211a26 100644 --- a/alab_management/task_actor.py +++ b/alab_management/task_actor.py @@ -13,11 +13,8 @@ from alab_management.logger import DBLogger from alab_management.sample_view import SampleView from alab_management.task_view import BaseTask, TaskStatus, TaskView -from alab_management.utils.middleware import register_abortable_middleware from alab_management.utils.module_ops import load_definition -register_abortable_middleware() - @dramatiq.actor( max_retries=0, diff --git a/alab_management/task_manager/task_manager.py b/alab_management/task_manager/task_manager.py index 7c75658c..b155cd49 100644 --- a/alab_management/task_manager/task_manager.py +++ b/alab_management/task_manager/task_manager.py @@ -26,6 +26,7 @@ class TaskManager: def __init__(self): load_definition() + register_abortable_middleware() self.task_view = TaskView() self.logger = DBLogger(task_id=None) diff --git a/alab_management/utils/data_objects.py b/alab_management/utils/data_objects.py index cafa223c..52a7ee1f 100644 --- a/alab_management/utils/data_objects.py +++ b/alab_management/utils/data_objects.py @@ -29,7 +29,7 @@ def init(cls): @classmethod def get_collection(cls, name: str) -> collection.Collection: """Get collection by name.""" - if cls.client is None: + if cls.db is None: cls.init() return cls.db[name] # type: ignore # pylint: disable=unsubscriptable-object diff --git a/tests/fake_lab/__init__.py b/tests/fake_lab/__init__.py index 15431605..c7cb9cd4 100644 --- a/tests/fake_lab/__init__.py +++ b/tests/fake_lab/__init__.py @@ -2,18 +2,28 @@ from alab_management.sample_view import SamplePosition, add_standalone_sample_position from alab_management.task_view import add_task +from .devices.device_that_fails import DeviceThatFails +from .devices.device_that_never_ends import DeviceThatNeverEnds +from .devices.device_that_run_slow import DeviceThatRunSlow from .devices.furnace import Furnace from .devices.robot_arm import RobotArm from .tasks.ending import Ending +from .tasks.error_handling_task import ErrorHandling from .tasks.heating import Heating +from .tasks.infinite_task import InfiniteTask from .tasks.moving import Moving from .tasks.starting import Starting +from .tasks.test_threading_task import TestThreading add_device(Furnace(name="furnace_1")) add_device(Furnace(name="furnace_2")) add_device(Furnace(name="furnace_3")) add_device(Furnace(name="furnace_4")) add_device(RobotArm(name="dummy")) +add_device(DeviceThatFails(name="device_that_fails")) +add_device(DeviceThatNeverEnds(name="device_that_never_ends")) +add_device(DeviceThatRunSlow(name="device_that_run_slow")) +add_device(DeviceThatRunSlow(name="device_that_run_slow_2")) add_standalone_sample_position( SamplePosition( @@ -24,7 +34,7 @@ add_standalone_sample_position( SamplePosition( "furnace_temp", - number=4, + number=16, description="Test positions", ) ) @@ -33,3 +43,6 @@ add_task(Moving) add_task(Heating) add_task(Ending) +add_task(ErrorHandling) +add_task(InfiniteTask) +add_task(TestThreading) diff --git a/tests/fake_lab/devices/device_that_fails.py b/tests/fake_lab/devices/device_that_fails.py index e69de29b..544f6b8a 100644 --- a/tests/fake_lab/devices/device_that_fails.py +++ b/tests/fake_lab/devices/device_that_fails.py @@ -0,0 +1,32 @@ +from typing import ClassVar + +from alab_management.device_view import BaseDevice +from alab_management.sample_view import SamplePosition + + +class DeviceThatFails(BaseDevice): + description: ClassVar[str] = "DeviceThatFails" + + @property + def sample_positions(self): + return [ + SamplePosition( + "failures", + description="This is a fake device that always fails.", + ), + ] + + def emergent_stop(self): + pass + + def fail(self): + raise Exception("This device always fails.") + + def connect(self): + pass + + def disconnect(self): + pass + + def is_running(self) -> bool: + return False \ No newline at end of file diff --git a/tests/fake_lab/devices/device_that_never_ends.py b/tests/fake_lab/devices/device_that_never_ends.py index e69de29b..7fa13970 100644 --- a/tests/fake_lab/devices/device_that_never_ends.py +++ b/tests/fake_lab/devices/device_that_never_ends.py @@ -0,0 +1,33 @@ +from typing import ClassVar + +from alab_management.device_view import BaseDevice +from alab_management.sample_view import SamplePosition + + +class DeviceThatNeverEnds(BaseDevice): + description: ClassVar[str] = "DeviceThatNeverEnds" + + @property + def sample_positions(self): + return [ + SamplePosition( + "infinite_loop", + description="This is a fake device that never ends.", + ), + ] + + def emergent_stop(self): + pass + + def run_infinite(self): + while True: + pass + + def connect(self): + pass + + def disconnect(self): + pass + + def is_running(self) -> bool: + return False diff --git a/tests/fake_lab/devices/device_that_run_slow.py b/tests/fake_lab/devices/device_that_run_slow.py index e69de29b..afbff69a 100644 --- a/tests/fake_lab/devices/device_that_run_slow.py +++ b/tests/fake_lab/devices/device_that_run_slow.py @@ -0,0 +1,34 @@ +from typing import ClassVar + +from alab_management.device_view import BaseDevice +from alab_management.sample_view import SamplePosition + + +class DeviceThatRunSlow(BaseDevice): + description: ClassVar[str] = "DeviceThatRunsSlow" + + @property + def sample_positions(self): + return [ + SamplePosition( + "slow", + description="This is a fake device that runs slow.", + ), + ] + + def emergent_stop(self): + pass + + def run_slow(self): + import time + + time.sleep(100) + + def connect(self): + pass + + def disconnect(self): + pass + + def is_running(self) -> bool: + return False diff --git a/tests/fake_lab/devices/furnace.py b/tests/fake_lab/devices/furnace.py index 16e2f8cf..dfde377e 100644 --- a/tests/fake_lab/devices/furnace.py +++ b/tests/fake_lab/devices/furnace.py @@ -18,6 +18,7 @@ def sample_positions(self): SamplePosition( "inside", description="The position inside the furnace, where the samples are heated", + number=8, ), ] @@ -30,7 +31,7 @@ def run_program(self, *segments): def finish(): self._is_running = False - t = Timer(2, finish) + t = Timer(0.1, finish) t.start() def is_running(self): diff --git a/tests/fake_lab/devices/larger_furnace.py b/tests/fake_lab/devices/larger_furnace.py index e69de29b..5c52aeae 100644 --- a/tests/fake_lab/devices/larger_furnace.py +++ b/tests/fake_lab/devices/larger_furnace.py @@ -0,0 +1,47 @@ +from threading import Timer +from typing import ClassVar + +from alab_management.device_view import BaseDevice +from alab_management.sample_view import SamplePosition + + +class Furnace(BaseDevice): + description: ClassVar[str] = "Fake larger furnace" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._is_running = False + + @property + def sample_positions(self): + return [ + SamplePosition( + "inside", + description="The position inside the furnace, where the samples are heated", + number=16, + ), + ] + + def emergent_stop(self): + pass + + def run_program(self): + self._is_running = True + + def finish(): + self._is_running = False + + t = Timer(0.1, finish) + t.start() + + def is_running(self): + return self._is_running + + def get_temperature(self): + return 300 + + def connect(self): + pass + + def disconnect(self): + pass diff --git a/tests/fake_lab/tasks/error_handling_task.py b/tests/fake_lab/tasks/error_handling_task.py index e69de29b..15e35334 100644 --- a/tests/fake_lab/tasks/error_handling_task.py +++ b/tests/fake_lab/tasks/error_handling_task.py @@ -0,0 +1,14 @@ +from bson import ObjectId +from ..devices.device_that_fails import DeviceThatFails + +from alab_management.task_view.task import BaseTask + + +class ErrorHandling(BaseTask): + def __init__(self, samples: list[ObjectId], *args, **kwargs): + super().__init__(samples=samples, *args, **kwargs) + self.sample = samples[0] + + def run(self): + with self.lab_view.request_resources({DeviceThatFails: {"failures": 1}}) as (device_that_fails, _): + device_that_fails.fail() diff --git a/tests/fake_lab/tasks/infinite_task.py b/tests/fake_lab/tasks/infinite_task.py index 2465287d..71ecb251 100644 --- a/tests/fake_lab/tasks/infinite_task.py +++ b/tests/fake_lab/tasks/infinite_task.py @@ -4,7 +4,7 @@ from alab_management.task_view.task import BaseTask -class ErrorHandling(BaseTask): +class InfiniteTask(BaseTask): def __init__(self, samples: list[ObjectId], *args, **kwargs): super().__init__(samples=samples, *args, **kwargs) self.sample = samples[0] diff --git a/tests/fake_lab/tasks/test_threading_task.py b/tests/fake_lab/tasks/test_threading_task.py index e69de29b..12bf391a 100644 --- a/tests/fake_lab/tasks/test_threading_task.py +++ b/tests/fake_lab/tasks/test_threading_task.py @@ -0,0 +1,14 @@ +from bson import ObjectId +from ..devices.device_that_fails import DeviceThatFails + +from alab_management.task_view.task import BaseTask + + +class TestThreading(BaseTask): + def __init__(self, samples: list[ObjectId], *args, **kwargs): + super().__init__(samples=samples, *args, **kwargs) + self.sample = samples[0] + + def run(self): + with self.lab_view.request_resources({DeviceThatFails: {"failures": 1}}) as (device_that_fails, _): + device_that_fails.fail() diff --git a/tests/test_launch.py b/tests/test_launch.py index 465f6041..d292ec09 100644 --- a/tests/test_launch.py +++ b/tests/test_launch.py @@ -1,5 +1,4 @@ import subprocess -import sys import time import unittest @@ -25,21 +24,25 @@ def setUp(self) -> None: setup_lab() self.task_view = TaskView() self.experiment_view = ExperimentView() - self.main_process = subprocess.Popen(["alabos", "launch", "--port", "8896"], stdout=sys.stdout, - stderr=sys.stderr) + self.main_process = subprocess.Popen(["alabos", "launch", "--port", "8896"], shell=False) self.worker_process = subprocess.Popen( - ["alabos", "launch_worker", "--processes", "8", "--threads", "1"], stdout=sys.stdout, stderr=sys.stderr + ["alabos", "launch_worker", "--processes", "8", "--threads", "8"], + shell=False, ) - time.sleep(3) # waiting for starting up + time.sleep(2) # waiting for starting up if self.main_process.poll() is not None: raise RuntimeError("Main process failed to start") if self.worker_process.poll() is not None: raise RuntimeError("Worker process failed to start") + print("Main process started: pid", self.main_process.pid) + print("Worker process started: pid", self.worker_process.pid) + def tearDown(self) -> None: self.main_process.terminate() self.worker_process.terminate() + time.sleep(1) cleanup_lab( all_collections=True, _force_i_know_its_dangerous=True, @@ -49,8 +52,75 @@ def tearDown(self) -> None: ) def test_submit_experiment(self): + def compose_exp(exp_name, num_samples): + sample_names = [f"{exp_name}_sample_{i}" for i in range(num_samples)] + return { + "name": exp_name, + "tags": [], + "metadata": {}, + "samples": [{"name": sample_name_, "tags": [], "metadata": {}} for sample_name_ in sample_names], + "tasks": [ + *[{ + "type": "Starting", + "prev_tasks": [], + "parameters": { + "dest": "furnace_temp", + }, + "samples": [sample_name_], + } for sample_name_ in sample_names], + { + "type": "Heating", + "prev_tasks": list(range(len(sample_names))), + "parameters": { + "setpoints": ((1, 2),), + }, + "samples": sample_names, + }, + *[{ + "type": "Ending", + "prev_tasks": [len(sample_names)], + "parameters": {}, + "samples": [sample_name_], + } for sample_name_ in sample_names], + + ], + } + + exp_ids = [] + num_of_tasks = 0 + for i in range(8): + experiment = compose_exp(f"Experiment with {i+1} samples", num_samples=i+1) + num_of_tasks += len(experiment["tasks"]) + resp = requests.post( + "http://127.0.0.1:8896/api/experiment/submit", json=experiment + ) + resp_json = resp.json() + exp_id = ObjectId(resp_json["data"]["exp_id"]) + self.assertTrue("success", resp_json["status"]) + exp_ids.append(exp_id) + time.sleep(30) + self.assertEqual(num_of_tasks, self.task_view._task_collection.count_documents({})) + + self.assertTrue( + all( + task["status"] == "COMPLETED" + for task in self.task_view._task_collection.find() + ) + ) + self.assertTrue( + all( + task["result"] == task["_id"] + for task in self.task_view._task_collection.find() + ) + ) + + for exp_id in exp_ids: + self.assertEqual( + "COMPLETED", self.experiment_view.get_experiment(exp_id)["status"] + ) + + def test_user_input(self): experiment = { - "name": "test", "tags": [], "metadata": {}, "samples": [{"name": "test_sample", "tags": [], "metadata": {}}], @@ -64,16 +134,8 @@ def test_submit_experiment(self): "samples": ["test_sample"], }, { - "type": "Heating", + "type": "ErrorHandling", "prev_tasks": [0], - "parameters": { - "setpoints": ((1, 2),), - }, - "samples": ["test_sample"], - }, - { - "type": "Ending", - "prev_tasks": [1], "parameters": {}, "samples": ["test_sample"], }, @@ -81,6 +143,7 @@ def test_submit_experiment(self): } exp_ids = [] for _ in range(3): + experiment["name"] = f"Failed {_}" resp = requests.post( "http://127.0.0.1:8896/api/experiment/submit", json=experiment ) @@ -88,7 +151,9 @@ def test_submit_experiment(self): exp_id = ObjectId(resp_json["data"]["exp_id"]) self.assertTrue("success", resp_json["status"]) exp_ids.append(exp_id) - time.sleep(60) + + time.sleep(5) + self.assertEqual(9, self.task_view._task_collection.count_documents({})) import rich rich.print(list(self.task_view._task_collection.find({}))) From 22488f155fcf2ccb52477cd83bf9ceb630b8e3ee Mon Sep 17 00:00:00 2001 From: Yuxing Fei Date: Fri, 3 May 2024 19:03:26 -0700 Subject: [PATCH 11/27] update --- tests/fake_lab/devices/larger_furnace.py | 47 ------------------------ 1 file changed, 47 deletions(-) delete mode 100644 tests/fake_lab/devices/larger_furnace.py diff --git a/tests/fake_lab/devices/larger_furnace.py b/tests/fake_lab/devices/larger_furnace.py deleted file mode 100644 index 5c52aeae..00000000 --- a/tests/fake_lab/devices/larger_furnace.py +++ /dev/null @@ -1,47 +0,0 @@ -from threading import Timer -from typing import ClassVar - -from alab_management.device_view import BaseDevice -from alab_management.sample_view import SamplePosition - - -class Furnace(BaseDevice): - description: ClassVar[str] = "Fake larger furnace" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._is_running = False - - @property - def sample_positions(self): - return [ - SamplePosition( - "inside", - description="The position inside the furnace, where the samples are heated", - number=16, - ), - ] - - def emergent_stop(self): - pass - - def run_program(self): - self._is_running = True - - def finish(): - self._is_running = False - - t = Timer(0.1, finish) - t.start() - - def is_running(self): - return self._is_running - - def get_temperature(self): - return 300 - - def connect(self): - pass - - def disconnect(self): - pass From a88921b9d13cef7eb85dbe473e25e80b6cb05311 Mon Sep 17 00:00:00 2001 From: Yuxing Fei Date: Sun, 5 May 2024 13:06:20 -0700 Subject: [PATCH 12/27] update --- tests/fake_lab/__init__.py | 8 +- tests/fake_lab/devices/device_that_fails.py | 2 +- tests/fake_lab/tasks/error_handling_task.py | 24 +++- tests/fake_lab/tasks/heating.py | 20 +-- tests/test_launch.py | 140 +++++++++++++++----- 5 files changed, 144 insertions(+), 50 deletions(-) diff --git a/tests/fake_lab/__init__.py b/tests/fake_lab/__init__.py index c7cb9cd4..3d43a919 100644 --- a/tests/fake_lab/__init__.py +++ b/tests/fake_lab/__init__.py @@ -8,13 +8,14 @@ from .devices.furnace import Furnace from .devices.robot_arm import RobotArm from .tasks.ending import Ending -from .tasks.error_handling_task import ErrorHandling +from .tasks.error_handling_task import ErrorHandlingUnrecoverable, ErrorHandlingRecoverable from .tasks.heating import Heating from .tasks.infinite_task import InfiniteTask from .tasks.moving import Moving from .tasks.starting import Starting from .tasks.test_threading_task import TestThreading + add_device(Furnace(name="furnace_1")) add_device(Furnace(name="furnace_2")) add_device(Furnace(name="furnace_3")) @@ -34,7 +35,7 @@ add_standalone_sample_position( SamplePosition( "furnace_temp", - number=16, + number=64, description="Test positions", ) ) @@ -43,6 +44,7 @@ add_task(Moving) add_task(Heating) add_task(Ending) -add_task(ErrorHandling) +add_task(ErrorHandlingUnrecoverable) +add_task(ErrorHandlingRecoverable) add_task(InfiniteTask) add_task(TestThreading) diff --git a/tests/fake_lab/devices/device_that_fails.py b/tests/fake_lab/devices/device_that_fails.py index 544f6b8a..dd3e619b 100644 --- a/tests/fake_lab/devices/device_that_fails.py +++ b/tests/fake_lab/devices/device_that_fails.py @@ -20,7 +20,7 @@ def emergent_stop(self): pass def fail(self): - raise Exception("This device always fails.") + raise ValueError("This device always fails.") def connect(self): pass diff --git a/tests/fake_lab/tasks/error_handling_task.py b/tests/fake_lab/tasks/error_handling_task.py index 15e35334..fd5b2fad 100644 --- a/tests/fake_lab/tasks/error_handling_task.py +++ b/tests/fake_lab/tasks/error_handling_task.py @@ -1,14 +1,34 @@ from bson import ObjectId -from ..devices.device_that_fails import DeviceThatFails from alab_management.task_view.task import BaseTask +from ..devices.device_that_fails import DeviceThatFails + -class ErrorHandling(BaseTask): +class ErrorHandlingUnrecoverable(BaseTask): def __init__(self, samples: list[ObjectId], *args, **kwargs): super().__init__(samples=samples, *args, **kwargs) self.sample = samples[0] def run(self): with self.lab_view.request_resources({DeviceThatFails: {"failures": 1}}) as (device_that_fails, _): + device_that_fails = device_that_fails[DeviceThatFails] device_that_fails.fail() + + +class ErrorHandlingRecoverable(BaseTask): + def __init__(self, samples: list[ObjectId], *args, **kwargs): + super().__init__(samples=samples, *args, **kwargs) + self.sample = samples[0] + + def run(self): + with self.lab_view.request_resources({DeviceThatFails: {"failures": 1}}) as (device_that_fails, _): + device_that_fails = device_that_fails[DeviceThatFails] + try: + device_that_fails.fail() + except Exception as e: + response = self.lab_view.request_user_input("What should I do?", options=["OK", "Abort"]) + if response == "OK": + pass + else: + raise e diff --git a/tests/fake_lab/tasks/heating.py b/tests/fake_lab/tasks/heating.py index d30fbbb7..51fc122f 100644 --- a/tests/fake_lab/tasks/heating.py +++ b/tests/fake_lab/tasks/heating.py @@ -18,22 +18,22 @@ def __init__( ): super().__init__(samples=samples, *args, **kwargs) self.setpoints = setpoints - self.sample = samples[0] def run(self): - with self.lab_view.request_resources({Furnace: {"inside": 1}}) as ( + with self.lab_view.request_resources({Furnace: {"inside": 8}}) as ( devices, sample_positions, ): furnace = devices[Furnace] - inside_furnace = sample_positions[Furnace]["inside"][0] - - self.lab_view.run_subtask( - Moving, - sample=self.sample, - samples=[self.sample], - dest=inside_furnace, - ) + inside_furnaces = sample_positions[Furnace]["inside"] + + for sample, inside_furnace in zip(self.samples, inside_furnaces): + self.lab_view.run_subtask( + Moving, + sample=sample, + samples=self.samples, + dest=inside_furnace, + ) furnace.run_program(self.setpoints) diff --git a/tests/test_launch.py b/tests/test_launch.py index d292ec09..c077a43e 100644 --- a/tests/test_launch.py +++ b/tests/test_launch.py @@ -26,7 +26,7 @@ def setUp(self) -> None: self.experiment_view = ExperimentView() self.main_process = subprocess.Popen(["alabos", "launch", "--port", "8896"], shell=False) self.worker_process = subprocess.Popen( - ["alabos", "launch_worker", "--processes", "8", "--threads", "8"], + ["alabos", "launch_worker", "--processes", "8", "--threads", "16"], shell=False, ) time.sleep(2) # waiting for starting up @@ -120,30 +120,32 @@ def compose_exp(exp_name, num_samples): ) def test_user_input(self): - experiment = { - "tags": [], - "metadata": {}, - "samples": [{"name": "test_sample", "tags": [], "metadata": {}}], - "tasks": [ - { - "type": "Starting", - "prev_tasks": [], - "parameters": { - "dest": "furnace_table", + def compose_exp(exp_name, error_task): + return { + "name": exp_name, + "tags": [], + "metadata": {}, + "samples": [{"name": "test_sample", "tags": [], "metadata": {}}], + "tasks": [ + { + "type": "Starting", + "prev_tasks": [], + "parameters": { + "dest": "furnace_table", + }, + "samples": ["test_sample"], }, - "samples": ["test_sample"], - }, - { - "type": "ErrorHandling", - "prev_tasks": [0], - "parameters": {}, - "samples": ["test_sample"], - }, - ], - } + { + "type": error_task, + "prev_tasks": [0], + "parameters": {}, + "samples": ["test_sample"], + }, + ], + } exp_ids = [] - for _ in range(3): - experiment["name"] = f"Failed {_}" + for error_name in ["ErrorHandlingUnrecoverable", "ErrorHandlingRecoverable"]: + experiment = compose_exp(f"Experiment with {error_name}", error_task=error_name) resp = requests.post( "http://127.0.0.1:8896/api/experiment/submit", json=experiment ) @@ -151,22 +153,30 @@ def test_user_input(self): exp_id = ObjectId(resp_json["data"]["exp_id"]) self.assertTrue("success", resp_json["status"]) exp_ids.append(exp_id) - time.sleep(5) - self.assertEqual(9, self.task_view._task_collection.count_documents({})) - import rich - rich.print(list(self.task_view._task_collection.find({}))) - # print(datetime.datetime.now()) - self.assertTrue( - all( - task["status"] == "COMPLETED" - for task in self.task_view._task_collection.find() + pending_user_input = requests.get("http://127.0.0.1:8896/api/userinput/pending").json() + import rich + rich.print(pending_user_input) + self.assertEqual(len(pending_user_input["pending_requests"].get(str(exp_id), [])), 1) + + request_id = pending_user_input["pending_requests"][str(exp_id)][0]["id"] + + # acknowledge the request + resp = requests.post( + "http://127.0.0.1:8896/api/userinput/submit", + json={ + "request_id": request_id, + "response": "OK", + "note": "dummy", + }, ) - ) + self.assertEqual("success", resp.json()["status"]) + + time.sleep(5) self.assertTrue( all( - task["result"] == task["_id"] + task["status"] == "COMPLETED" or task["status"] == "ERROR" for task in self.task_view._task_collection.find() ) ) @@ -175,3 +185,65 @@ def test_user_input(self): self.assertEqual( "COMPLETED", self.experiment_view.get_experiment(exp_id)["status"] ) + + def test_cancel(self): + def compose_exp(exp_name): + return { + "name": exp_name, + "tags": [], + "metadata": {}, + "samples": [{"name": "test_sample", "tags": [], "metadata": {}}], + "tasks": [ + { + "type": "Starting", + "prev_tasks": [], + "parameters": { + "dest": "furnace_table", + }, + "samples": ["test_sample"], + }, + { + "type": "InfiniteTask", + "prev_tasks": [0], + "parameters": {}, + "samples": ["test_sample"], + }, + ], + } + experiment = compose_exp("Experiment with cancel") + resp = requests.post( + "http://127.0.0.1:8896/api/experiment/submit", json=experiment + ) + resp_json = resp.json() + exp_id = ObjectId(resp_json["data"]["exp_id"]) + self.assertTrue("success", resp_json["status"]) + + time.sleep(3) + self.assertEqual( + "RUNNING", self.experiment_view.get_experiment(exp_id)["status"] + ) + + resp = requests.get( + f"http://127.0.0.1:8896/api/experiment/cancel/{exp_id!s}", + ) + self.assertEqual("success", resp.json()["status"]) + time.sleep(3) + + pending_user_input = requests.get("http://127.0.0.1:8896/api/userinput/pending").json() + self.assertEqual(len(pending_user_input["pending_requests"].get(str(exp_id), [])), 1) + request_id = pending_user_input["pending_requests"][str(exp_id)][0]["id"] + # acknowledge the request + resp = requests.post( + "http://127.0.0.1:8896/api/userinput/submit", + json={ + "request_id": request_id, + "response": "OK", + "note": "dummy", + }, + ) + self.assertEqual("success", resp.json()["status"]) + + time.sleep(3) + self.assertEqual( + "COMPLETED", self.experiment_view.get_experiment(exp_id)["status"] + ) From 2dad85d9a5e59195419994992e7e97c5a90f8c64 Mon Sep 17 00:00:00 2001 From: Yuxing Fei Date: Sun, 5 May 2024 13:43:03 -0700 Subject: [PATCH 13/27] working codes --- tests/fake_lab/tasks/test_threading_task.py | 14 -------------- 1 file changed, 14 deletions(-) delete mode 100644 tests/fake_lab/tasks/test_threading_task.py diff --git a/tests/fake_lab/tasks/test_threading_task.py b/tests/fake_lab/tasks/test_threading_task.py deleted file mode 100644 index 12bf391a..00000000 --- a/tests/fake_lab/tasks/test_threading_task.py +++ /dev/null @@ -1,14 +0,0 @@ -from bson import ObjectId -from ..devices.device_that_fails import DeviceThatFails - -from alab_management.task_view.task import BaseTask - - -class TestThreading(BaseTask): - def __init__(self, samples: list[ObjectId], *args, **kwargs): - super().__init__(samples=samples, *args, **kwargs) - self.sample = samples[0] - - def run(self): - with self.lab_view.request_resources({DeviceThatFails: {"failures": 1}}) as (device_that_fails, _): - device_that_fails.fail() From d8c50037f42548a33e5cbfe68c9e305d60695ca8 Mon Sep 17 00:00:00 2001 From: Yuxing Fei Date: Sun, 5 May 2024 13:45:00 -0700 Subject: [PATCH 14/27] working codes --- .github/workflows/ci.yaml | 2 +- alab_management/utils/module_ops.py | 63 +++-------------------------- tests/fake_lab/__init__.py | 2 - tests/fake_lab/tasks/__init__.py | 2 + tests/test_lab_view.py | 6 +-- tests/test_sample_view.py | 28 ++++++------- tests/test_task_manager.py | 18 ++++----- 7 files changed, 35 insertions(+), 86 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index a2827af2..ad04f15b 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -43,7 +43,7 @@ jobs: - name: Set up environment run: | pip install --upgrade pip - pip install --quiet . + pip install --quiet '.[dev]' - name: Set up pyright run: | npm install -g pyright diff --git a/alab_management/utils/module_ops.py b/alab_management/utils/module_ops.py index 04d217f6..c8ddc522 100644 --- a/alab_management/utils/module_ops.py +++ b/alab_management/utils/module_ops.py @@ -7,8 +7,13 @@ from pathlib import Path +__imported_modules__ = {} + + def import_module_from_path(path: str | Path, parent_package: str | None = None): """Import a module by its path.""" + if path in __imported_modules__: # Avoid importing the same module twice + return __imported_modules__[path] if not isinstance(path, Path): path = Path(path) sys.path.insert(0, path.parent.as_posix()) @@ -21,6 +26,7 @@ def import_module_from_path(path: str | Path, parent_package: str | None = None) ) raise sys.path.pop(0) + __imported_modules__[path] = module return module @@ -39,60 +45,3 @@ def load_definition(): ) import_module_from_path(dir_to_import_from) - - -# def import_device_definitions(file_folder: str, module_name: str): -# """ -# Import device definition from files, which should be a instance of ``BaseDevice``, and we will -# call ``add_device`` to register them automatically - -# Args: -# file_folder: the folder that contains the device instances -# module_name: the name of module -# """ -# from ..device_view.device import BaseDevice, add_device, get_all_devices - -# file_folder_path = Path(file_folder).parent - -# for path in file_folder_path.iterdir(): -# file_name = path.relative_to(file_folder_path) -# if re.match(r"^[a-zA-Z][a-zA-Z0-9_]*(\.py)?$", file_name.name) is None: -# continue -# device_module = importlib.import_module( -# "." + re.sub(r"(\.py)$", "", path.name), package=module_name -# ) - -# for v in device_module.__dict__.values(): -# if isinstance(v, BaseDevice) and v not in get_all_devices().values(): -# add_device(v) - - -# def import_task_definitions(file_folder: str, module_name: str): -# """ -# Import task definitions from files, which should be a subclass of ``BaseTask``, and we will -# call ``add_task`` to register them automatically - -# Args: -# file_folder: the folder that contains the device instances -# module_name: the name of module -# """ -# from ..task_view.task import BaseTask, add_task, get_all_tasks - -# file_folder_path = Path(file_folder).parent - -# for path in file_folder_path.iterdir(): -# file_name = path.relative_to(file_folder_path) -# if re.match(r"^[a-zA-Z][a-zA-Z0-9_.]*(\.py)?$", file_name.name) is None: -# continue -# task_module = importlib.import_module( -# "." + re.sub(r"(\.py)$", "", path.name), package=module_name -# ) - -# for v in task_module.__dict__.values(): -# if ( -# isinstance(v, type) -# and v is not BaseTask -# and issubclass(v, BaseTask) -# and v not in get_all_tasks().values() -# ): -# add_task(v) diff --git a/tests/fake_lab/__init__.py b/tests/fake_lab/__init__.py index 3d43a919..3f5ca59d 100644 --- a/tests/fake_lab/__init__.py +++ b/tests/fake_lab/__init__.py @@ -13,7 +13,6 @@ from .tasks.infinite_task import InfiniteTask from .tasks.moving import Moving from .tasks.starting import Starting -from .tasks.test_threading_task import TestThreading add_device(Furnace(name="furnace_1")) @@ -47,4 +46,3 @@ add_task(ErrorHandlingUnrecoverable) add_task(ErrorHandlingRecoverable) add_task(InfiniteTask) -add_task(TestThreading) diff --git a/tests/fake_lab/tasks/__init__.py b/tests/fake_lab/tasks/__init__.py index 8768b295..07e815e6 100644 --- a/tests/fake_lab/tasks/__init__.py +++ b/tests/fake_lab/tasks/__init__.py @@ -1,4 +1,6 @@ from .ending import Ending +from .error_handling_task import ErrorHandlingRecoverable, ErrorHandlingUnrecoverable from .heating import Heating +from .infinite_task import InfiniteTask from .moving import Moving from .starting import Starting diff --git a/tests/test_lab_view.py b/tests/test_lab_view.py index b0e17671..9f2a3212 100644 --- a/tests/test_lab_view.py +++ b/tests/test_lab_view.py @@ -83,7 +83,7 @@ def test_request_resources(self): ) as (devices, sample_positions): self.assertDictEqual( { - Furnace: {"inside": ["furnace_1/inside"]}, + Furnace: {"inside": ["furnace_1/inside/1"]}, None: {"furnace_table": ["furnace_table"]}, }, sample_positions, @@ -93,7 +93,7 @@ def test_request_resources(self): self.assertEqual( "LOCKED", - self.sample_view.get_sample_position_status("furnace_1/inside")[0].name, + self.sample_view.get_sample_position_status("furnace_1/inside/1")[0].name, ) self.assertEqual( "LOCKED", @@ -105,7 +105,7 @@ def test_request_resources(self): self.assertEqual( "EMPTY", - self.sample_view.get_sample_position_status("furnace_1/inside")[0].name, + self.sample_view.get_sample_position_status("furnace_1/inside/1")[0].name, ) self.assertEqual( "EMPTY", diff --git a/tests/test_sample_view.py b/tests/test_sample_view.py index 1b257292..5b2bb89c 100644 --- a/tests/test_sample_view.py +++ b/tests/test_sample_view.py @@ -84,9 +84,9 @@ def test_create_sample(self): self.assertEqual("test", sample.name) # try to create samples with same name - sample_id = self.sample_view.create_sample("test", position="furnace_1/inside") + sample_id = self.sample_view.create_sample("test", position="furnace_1/inside/1") sample = self.sample_view.get_sample(sample_id=sample_id) - self.assertEqual("furnace_1/inside", sample.position) + self.assertEqual("furnace_1/inside/1", sample.position) self.assertEqual("test", sample.name) # try to create samples with non-exist positions @@ -399,22 +399,22 @@ def test_request_multiple_sample_positions(self): ) # try when requesting sample positions more than we have in the lab - with ( - self.assertRaises(ValueError), - self.request_sample_positions( - [{"prefix": "furnace_temp", "number": 5}], task_id - ), - ): - pass + with self.assertRaises(ValueError): # noqa: SIM117 + with ( + self.request_sample_positions( + [{"prefix": "furnace_temp", "number": 10000}], task_id + ), + ): + pass def test_request_multiple_sample_positions_multiple_tasks(self): task_id_1 = ObjectId() task_id_2 = ObjectId() with self.request_sample_positions( - [{"prefix": "furnace_temp", "number": 2}], task_id_1 + [{"prefix": "furnace_temp", "number": 32}], task_id_1 ) as sample_positions: - self.assertEqual(2, len(sample_positions["furnace_temp"])) + self.assertEqual(32, len(sample_positions["furnace_temp"])) self.assertTrue( sample_positions["furnace_temp"][0].startswith("furnace_temp") ) @@ -422,9 +422,9 @@ def test_request_multiple_sample_positions_multiple_tasks(self): sample_positions["furnace_temp"][1].startswith("furnace_temp") ) with self.request_sample_positions( - [{"prefix": "furnace_temp", "number": 2}], task_id_2 + [{"prefix": "furnace_temp", "number": 32}], task_id_2 ) as sample_positions_: - self.assertEqual(2, len(sample_positions_["furnace_temp"])) + self.assertEqual(32, len(sample_positions_["furnace_temp"])) self.assertTrue( sample_positions_["furnace_temp"][0].startswith("furnace_temp") ) @@ -432,6 +432,6 @@ def test_request_multiple_sample_positions_multiple_tasks(self): sample_positions_["furnace_temp"][1].startswith("furnace_temp") ) with self.request_sample_positions( - [{"prefix": "furnace_temp", "number": 4}], task_id_2 + [{"prefix": "furnace_temp", "number": 33}], task_id_2 ) as sample_positions_: self.assertIs(None, sample_positions_) diff --git a/tests/test_task_manager.py b/tests/test_task_manager.py index 7b584292..5256ce94 100644 --- a/tests/test_task_manager.py +++ b/tests/test_task_manager.py @@ -74,7 +74,7 @@ def test_task_requester(self): self.assertDictEqual( { "devices": {furnace_type: "furnace_1"}, - "sample_positions": {furnace_type: {"inside": ["furnace_1/inside"]}}, + "sample_positions": {furnace_type: {"inside": ["furnace_1/inside/1"]}}, "timeout_error": False, }, result, @@ -83,7 +83,7 @@ def test_task_requester(self): self.device_view.get_status("furnace_1"), DeviceTaskStatus.OCCUPIED ) self.assertEqual( - self.sample_view.get_sample_position_status("furnace_1/inside"), + self.sample_view.get_sample_position_status("furnace_1/inside/1"), (SamplePositionStatus.LOCKED, self.resource_requester.task_id), ) self.resource_requester.release_resources(_id) @@ -92,7 +92,7 @@ def test_task_requester(self): self.device_view.get_status("furnace_1"), DeviceTaskStatus.IDLE ) self.assertEqual( - self.sample_view.get_sample_position_status("furnace_1/inside"), + self.sample_view.get_sample_position_status("furnace_1/inside/1"), (SamplePositionStatus.EMPTY, None), ) @@ -104,7 +104,7 @@ def test_task_requester(self): self.assertDictEqual( { "devices": {furnace_type: "furnace_1"}, - "sample_positions": {furnace_type: {"inside": ["furnace_1/inside"]}}, + "sample_positions": {furnace_type: {"inside": ["furnace_1/inside/1"]}}, "timeout_error": False, }, result, @@ -113,7 +113,7 @@ def test_task_requester(self): self.device_view.get_status("furnace_1"), DeviceTaskStatus.OCCUPIED ) self.assertEqual( - self.sample_view.get_sample_position_status("furnace_1/inside"), + self.sample_view.get_sample_position_status("furnace_1/inside/1"), (SamplePositionStatus.LOCKED, self.resource_requester.task_id), ) self.resource_requester.release_resources(_id) @@ -121,7 +121,7 @@ def test_task_requester(self): self.device_view.get_status("furnace_1"), DeviceTaskStatus.IDLE ) self.assertEqual( - self.sample_view.get_sample_position_status("furnace_1/inside"), + self.sample_view.get_sample_position_status("furnace_1/inside/1"), (SamplePositionStatus.EMPTY, None), ) @@ -133,7 +133,7 @@ def test_task_requester(self): self.assertDictEqual( { "devices": {furnace_type: "furnace_1"}, - "sample_positions": {furnace_type: {"inside": ["furnace_1/inside"]}}, + "sample_positions": {furnace_type: {"inside": ["furnace_1/inside/1"]}}, "timeout_error": False, }, result, @@ -179,7 +179,7 @@ def test_task_requester(self): { "devices": {furnace_type: "furnace_1"}, "sample_positions": { - furnace_type: {"inside": ["furnace_1/inside"]}, + furnace_type: {"inside": ["furnace_1/inside/1"]}, None: { "furnace_temp": [ "furnace_temp/1", @@ -198,5 +198,5 @@ def test_task_requester(self): def test_task_request_wrong_number(self): with self.assertRaises(ValueError): self.resource_requester.request_resources( - {None: {"furnace_temp": 10}}, timeout=4 + {None: {"furnace_temp": 10000}}, timeout=4 ) From ed0f2569bde0ed4d1d57f30ea473cf8758bb6203 Mon Sep 17 00:00:00 2001 From: Yuxing Fei Date: Sun, 5 May 2024 14:25:06 -0700 Subject: [PATCH 15/27] fix ci --- .github/workflows/ci.yaml | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index ad04f15b..80e75b7c 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -43,7 +43,7 @@ jobs: - name: Set up environment run: | pip install --upgrade pip - pip install --quiet '.[dev]' + pip install '.[dev]' - name: Set up pyright run: | npm install -g pyright diff --git a/pyproject.toml b/pyproject.toml index a3e66246..a299b2fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,7 +65,7 @@ docs = [ dev = [ "pre-commit>=2.12.1", "sphinx >= 3.2.1", - "sphinx_book_theme == 0.1.7", + "sphinx_book_theme", "recommonmark ~= 0.7.1", "sphinx-autodoc-typehints >= 1.12.0", "pytest >= 6.2.5", From a7372bc7c80dae0ad89c7321a4a70a45b9a269f7 Mon Sep 17 00:00:00 2001 From: Yuxing Fei Date: Sun, 5 May 2024 14:31:24 -0700 Subject: [PATCH 16/27] fix ci --- .github/workflows/page.yaml | 2 +- pyproject.toml | 7 +++---- tests/test_launch.py | 14 ++++++-------- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/.github/workflows/page.yaml b/.github/workflows/page.yaml index e6c71956..2b5bcd23 100644 --- a/.github/workflows/page.yaml +++ b/.github/workflows/page.yaml @@ -21,7 +21,7 @@ jobs: with: python-version: '3.10' - name: Set up dependencies - run: pip install --quiet . + run: pip install '.' - name: Compile sphinx working-directory: . run: | diff --git a/pyproject.toml b/pyproject.toml index a299b2fa..6612aa88 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,13 +61,12 @@ docs = [ "nbsphinx==0.9.3", "sphinx-copybutton==0.5.2", "sphinx==7.2.5", + "sphinx-autodoc-typehints >= 1.12.0", + "sphinx_book_theme", ] dev = [ "pre-commit>=2.12.1", - "sphinx >= 3.2.1", - "sphinx_book_theme", "recommonmark ~= 0.7.1", - "sphinx-autodoc-typehints >= 1.12.0", "pytest >= 6.2.5", "pytest_reraise >= 2.1.1", "pylint >= 2.11.1", @@ -82,7 +81,7 @@ dev = [ "flake8-docstrings >= 1.6.0", "ruff" ] -tests = ["pytest-cov==4.1.0", "pytest==7.4.1", "moto==4.2.2"] +tests = ["pytest-cov==4.1.0", "pytest==7.4.1", "moto==4.2.2", "pytest-env ~= 0.6.2"] vis = ["matplotlib", "pydot"] [project.urls] diff --git a/tests/test_launch.py b/tests/test_launch.py index c077a43e..8d0b1e02 100644 --- a/tests/test_launch.py +++ b/tests/test_launch.py @@ -42,7 +42,7 @@ def setUp(self) -> None: def tearDown(self) -> None: self.main_process.terminate() self.worker_process.terminate() - time.sleep(1) + time.sleep(5) cleanup_lab( all_collections=True, _force_i_know_its_dangerous=True, @@ -153,11 +153,9 @@ def compose_exp(exp_name, error_task): exp_id = ObjectId(resp_json["data"]["exp_id"]) self.assertTrue("success", resp_json["status"]) exp_ids.append(exp_id) - time.sleep(5) + time.sleep(10) pending_user_input = requests.get("http://127.0.0.1:8896/api/userinput/pending").json() - import rich - rich.print(pending_user_input) self.assertEqual(len(pending_user_input["pending_requests"].get(str(exp_id), [])), 1) request_id = pending_user_input["pending_requests"][str(exp_id)][0]["id"] @@ -173,7 +171,7 @@ def compose_exp(exp_name, error_task): ) self.assertEqual("success", resp.json()["status"]) - time.sleep(5) + time.sleep(10) self.assertTrue( all( task["status"] == "COMPLETED" or task["status"] == "ERROR" @@ -218,7 +216,7 @@ def compose_exp(exp_name): exp_id = ObjectId(resp_json["data"]["exp_id"]) self.assertTrue("success", resp_json["status"]) - time.sleep(3) + time.sleep(10) self.assertEqual( "RUNNING", self.experiment_view.get_experiment(exp_id)["status"] ) @@ -227,7 +225,7 @@ def compose_exp(exp_name): f"http://127.0.0.1:8896/api/experiment/cancel/{exp_id!s}", ) self.assertEqual("success", resp.json()["status"]) - time.sleep(3) + time.sleep(10) pending_user_input = requests.get("http://127.0.0.1:8896/api/userinput/pending").json() self.assertEqual(len(pending_user_input["pending_requests"].get(str(exp_id), [])), 1) @@ -243,7 +241,7 @@ def compose_exp(exp_name): ) self.assertEqual("success", resp.json()["status"]) - time.sleep(3) + time.sleep(10) self.assertEqual( "COMPLETED", self.experiment_view.get_experiment(exp_id)["status"] ) From 5d78442df05a0a8ead41ae97a29f1627dc98508b Mon Sep 17 00:00:00 2001 From: Yuxing Fei Date: Sun, 5 May 2024 14:43:51 -0700 Subject: [PATCH 17/27] fix ci --- .github/workflows/page.yaml | 2 +- alab_management/experiment_manager.py | 17 +++++----- alab_management/lab_view.py | 2 +- alab_management/resource_manager/__init__.py | 1 + alab_management/utils/middleware.py | 34 +------------------- alab_management/utils/module_ops.py | 1 - pyproject.toml | 6 ++-- tests/fake_lab/__init__.py | 3 +- tests/fake_lab/devices/device_that_fails.py | 2 +- tests/fake_lab/tasks/error_handling_task.py | 12 +++---- tests/fake_lab/tasks/infinite_task.py | 3 +- 11 files changed, 26 insertions(+), 57 deletions(-) diff --git a/.github/workflows/page.yaml b/.github/workflows/page.yaml index 2b5bcd23..0a37dc68 100644 --- a/.github/workflows/page.yaml +++ b/.github/workflows/page.yaml @@ -21,7 +21,7 @@ jobs: with: python-version: '3.10' - name: Set up dependencies - run: pip install '.' + run: pip install '.[docs]' - name: Compile sphinx working-directory: . run: | diff --git a/alab_management/experiment_manager.py b/alab_management/experiment_manager.py index 0bca7560..a2542379 100644 --- a/alab_management/experiment_manager.py +++ b/alab_management/experiment_manager.py @@ -6,7 +6,6 @@ done. """ -import time from typing import Any from .config import AlabOSConfig @@ -31,7 +30,7 @@ def __init__(self): config = AlabOSConfig() self.__copy_to_completed_db = ( - "mongodb_completed" in config + "mongodb_completed" in config ) # if this is not defined in the config, assume it this feature is not being used. if self.__copy_to_completed_db: self.completed_experiment_view = CompletedExperimentView() @@ -157,13 +156,13 @@ def mark_completed_experiments(self): # if all the tasks of an experiment have been finished if all( - self.task_view.get_status(task_id=task_id) - in { - TaskStatus.COMPLETED, - TaskStatus.ERROR, - TaskStatus.CANCELLED, - } - for task_id in task_ids + self.task_view.get_status(task_id=task_id) + in { + TaskStatus.COMPLETED, + TaskStatus.ERROR, + TaskStatus.CANCELLED, + } + for task_id in task_ids ): self.experiment_view.update_experiment_status( exp_id=experiment["_id"], status=ExperimentStatus.COMPLETED diff --git a/alab_management/lab_view.py b/alab_management/lab_view.py index e83d98c7..7c7010ee 100644 --- a/alab_management/lab_view.py +++ b/alab_management/lab_view.py @@ -18,9 +18,9 @@ from alab_management.device_view.device import BaseDevice from alab_management.experiment_view.experiment_view import ExperimentView from alab_management.logger import DBLogger +from alab_management.resource_manager.resource_requester import ResourceRequester from alab_management.sample_view.sample import Sample from alab_management.sample_view.sample_view import SamplePositionRequest, SampleView -from alab_management.resource_manager.resource_requester import ResourceRequester from alab_management.task_view.task import BaseTask from alab_management.task_view.task_enums import TaskPriority, TaskStatus from alab_management.task_view.task_view import TaskView diff --git a/alab_management/resource_manager/__init__.py b/alab_management/resource_manager/__init__.py index e69de29b..3babc708 100644 --- a/alab_management/resource_manager/__init__.py +++ b/alab_management/resource_manager/__init__.py @@ -0,0 +1 @@ +"""This file contains the resource manager module.""" diff --git a/alab_management/utils/middleware.py b/alab_management/utils/middleware.py index 75cdfb05..227532e0 100644 --- a/alab_management/utils/middleware.py +++ b/alab_management/utils/middleware.py @@ -1,43 +1,11 @@ -import dramatiq from dramatiq import get_broker -from dramatiq.brokers.rabbitmq import RabbitmqBroker -from dramatiq.middleware.age_limit import AgeLimit -from dramatiq.middleware.callbacks import Callbacks -from dramatiq.middleware.pipelines import Pipelines -from dramatiq.middleware.prometheus import Prometheus -from dramatiq.middleware.retries import Retries -from dramatiq.middleware.shutdown import ShutdownNotifications -from dramatiq.middleware.time_limit import TimeLimit from dramatiq_abort import Abortable, backends from alab_management.utils.data_objects import get_collection -def patch_dramatiq(): - class PatchedPrometheus(Prometheus): - @property - def forks(self): - return [] - - broker_middleware = [ - PatchedPrometheus, AgeLimit, TimeLimit, - ShutdownNotifications, Callbacks, Pipelines, Retries - ] - - broker_middleware = [m() for m in broker_middleware] - - rabbitmq_broker = RabbitmqBroker( - host="127.0.0.1", - port=5672, - heartbeat=60, - connection_attempts=5, - blocked_connection_timeout=30, - middleware=broker_middleware - ) - dramatiq.set_broker(rabbitmq_broker) - - def register_abortable_middleware(): + """This function registers the abortable middleware to the dramatiq broker.""" abortable = Abortable( backend=backends.MongoDBBackend(collection=get_collection("abortable")) ) diff --git a/alab_management/utils/module_ops.py b/alab_management/utils/module_ops.py index c8ddc522..5d10c483 100644 --- a/alab_management/utils/module_ops.py +++ b/alab_management/utils/module_ops.py @@ -6,7 +6,6 @@ from copy import copy from pathlib import Path - __imported_modules__ = {} diff --git a/pyproject.toml b/pyproject.toml index 6612aa88..2b522d46 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -145,6 +145,9 @@ exclude_lines = [ [tool.ruff] target-version = "py310" +line-length = 150 + +[tool.ruff.lint] ignore-init-module-imports = true select = [ "B", # flake8-bugbear @@ -165,9 +168,8 @@ select = [ ] ignore = ["B028", "PLW0603", "RUF013", "D100", "D404", "D401", "D205", "RUF100"] pydocstyle.convention = "numpy" -line-length = 150 -[tool.ruff.per-file-ignores] +[tool.ruff.lint.per-file-ignores] "__init__.py" = ["F401"] "**/dashboard/*" = ["D"] "**/tests/*" = ["D"] diff --git a/tests/fake_lab/__init__.py b/tests/fake_lab/__init__.py index 3f5ca59d..dcdbac4f 100644 --- a/tests/fake_lab/__init__.py +++ b/tests/fake_lab/__init__.py @@ -8,13 +8,12 @@ from .devices.furnace import Furnace from .devices.robot_arm import RobotArm from .tasks.ending import Ending -from .tasks.error_handling_task import ErrorHandlingUnrecoverable, ErrorHandlingRecoverable +from .tasks.error_handling_task import ErrorHandlingRecoverable, ErrorHandlingUnrecoverable from .tasks.heating import Heating from .tasks.infinite_task import InfiniteTask from .tasks.moving import Moving from .tasks.starting import Starting - add_device(Furnace(name="furnace_1")) add_device(Furnace(name="furnace_2")) add_device(Furnace(name="furnace_3")) diff --git a/tests/fake_lab/devices/device_that_fails.py b/tests/fake_lab/devices/device_that_fails.py index dd3e619b..9d372304 100644 --- a/tests/fake_lab/devices/device_that_fails.py +++ b/tests/fake_lab/devices/device_that_fails.py @@ -29,4 +29,4 @@ def disconnect(self): pass def is_running(self) -> bool: - return False \ No newline at end of file + return False diff --git a/tests/fake_lab/tasks/error_handling_task.py b/tests/fake_lab/tasks/error_handling_task.py index fd5b2fad..a58a207b 100644 --- a/tests/fake_lab/tasks/error_handling_task.py +++ b/tests/fake_lab/tasks/error_handling_task.py @@ -2,7 +2,7 @@ from alab_management.task_view.task import BaseTask -from ..devices.device_that_fails import DeviceThatFails +from ..devices.device_that_fails import DeviceThatFails # noqa: TID252 class ErrorHandlingUnrecoverable(BaseTask): @@ -11,8 +11,8 @@ def __init__(self, samples: list[ObjectId], *args, **kwargs): self.sample = samples[0] def run(self): - with self.lab_view.request_resources({DeviceThatFails: {"failures": 1}}) as (device_that_fails, _): - device_that_fails = device_that_fails[DeviceThatFails] + with self.lab_view.request_resources({DeviceThatFails: {"failures": 1}}) as (devices, _): + device_that_fails = devices[DeviceThatFails] device_that_fails.fail() @@ -22,10 +22,10 @@ def __init__(self, samples: list[ObjectId], *args, **kwargs): self.sample = samples[0] def run(self): - with self.lab_view.request_resources({DeviceThatFails: {"failures": 1}}) as (device_that_fails, _): - device_that_fails = device_that_fails[DeviceThatFails] + with self.lab_view.request_resources({DeviceThatFails: {"failures": 1}}) as (devices, _): + device_that_fails_ = devices[DeviceThatFails] try: - device_that_fails.fail() + device_that_fails_.fail() except Exception as e: response = self.lab_view.request_user_input("What should I do?", options=["OK", "Abort"]) if response == "OK": diff --git a/tests/fake_lab/tasks/infinite_task.py b/tests/fake_lab/tasks/infinite_task.py index 71ecb251..92fbd5d4 100644 --- a/tests/fake_lab/tasks/infinite_task.py +++ b/tests/fake_lab/tasks/infinite_task.py @@ -1,8 +1,9 @@ from bson import ObjectId -from ..devices.device_that_never_ends import DeviceThatNeverEnds from alab_management.task_view.task import BaseTask +from ..devices.device_that_never_ends import DeviceThatNeverEnds # noqa: TID252 + class InfiniteTask(BaseTask): def __init__(self, samples: list[ObjectId], *args, **kwargs): From 58b6beb6e7342ffcdb563f372ea75e1f32283fb9 Mon Sep 17 00:00:00 2001 From: Yuxing Fei Date: Sun, 5 May 2024 14:50:24 -0700 Subject: [PATCH 18/27] fix ci --- alab_management/task_view/task_view.py | 1 - pyproject.toml | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/alab_management/task_view/task_view.py b/alab_management/task_view/task_view.py index 8145f923..68782e68 100644 --- a/alab_management/task_view/task_view.py +++ b/alab_management/task_view/task_view.py @@ -465,4 +465,3 @@ def get_tasks_to_be_canceled(self) -> list[dict[str, Any]]: def exists(self, task_id: ObjectId | str) -> bool: """Check if a task id exists.""" return self._task_collection.find_one({"_id": ObjectId(task_id)}) is not None - diff --git a/pyproject.toml b/pyproject.toml index 2b522d46..814a5893 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,10 +63,10 @@ docs = [ "sphinx==7.2.5", "sphinx-autodoc-typehints >= 1.12.0", "sphinx_book_theme", + "recommonmark ~= 0.7.1", ] dev = [ "pre-commit>=2.12.1", - "recommonmark ~= 0.7.1", "pytest >= 6.2.5", "pytest_reraise >= 2.1.1", "pylint >= 2.11.1", From a7cf24a687e9fa216409885865d72dfde264927b Mon Sep 17 00:00:00 2001 From: Yuxing Fei Date: Sun, 5 May 2024 14:55:31 -0700 Subject: [PATCH 19/27] fix ci --- alab_management/scripts/launch_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alab_management/scripts/launch_worker.py b/alab_management/scripts/launch_worker.py index 83104095..84d0f1e5 100644 --- a/alab_management/scripts/launch_worker.py +++ b/alab_management/scripts/launch_worker.py @@ -23,7 +23,7 @@ def launch_worker(args): if os.path.exists(lock_file): raise RuntimeError("Worker lock file exists. Another worker is already running.") - with open(lock_file, "w") as f: + with open(lock_file, "w", encoding="utf-8") as f: f.write(str(os.getpid())) try: launch(args=args) From 3d5834e619ff51cef97affdb7a30d3900c316e09 Mon Sep 17 00:00:00 2001 From: Yuxing Fei Date: Sun, 5 May 2024 15:00:39 -0700 Subject: [PATCH 20/27] update frontend (not recompiled yet) --- client/src/dashboard/components/Experiments.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/client/src/dashboard/components/Experiments.js b/client/src/dashboard/components/Experiments.js index 9d415a19..78573f84 100644 --- a/client/src/dashboard/components/Experiments.js +++ b/client/src/dashboard/components/Experiments.js @@ -131,8 +131,6 @@ function Row({ experiment_id, hoverForId }) { return "inherit"; case "CANCELLED": return "gray"; - case "CANCELLING": - return "gray"; default: return "inherit"; } @@ -276,7 +274,7 @@ function Row({ experiment_id, hoverForId }) {