diff --git a/alab_management/experiment_manager.py b/alab_management/experiment_manager.py index a2542379..16225083 100644 --- a/alab_management/experiment_manager.py +++ b/alab_management/experiment_manager.py @@ -5,7 +5,7 @@ tasks and samples and mark the finished tasks in the database when it is done. """ - +import time from typing import Any from .config import AlabOSConfig @@ -46,7 +46,7 @@ def run(self): ) while True: self._loop() - # time.sleep() + time.sleep(1) def _loop(self): self.handle_pending_experiments() diff --git a/alab_management/resource_manager/resource_manager.py b/alab_management/resource_manager/resource_manager.py index f9124f2e..01119668 100644 --- a/alab_management/resource_manager/resource_manager.py +++ b/alab_management/resource_manager/resource_manager.py @@ -49,7 +49,7 @@ def run(self): """Start the loop.""" while True: self._loop() - # time.sleep(1) + time.sleep(0.5) def _loop(self): self.handle_released_resources() @@ -85,7 +85,8 @@ def _handle_requested_resources(self, request_entry: dict[str, Any]): task_status = self.task_view.get_status(task_id=task_id) if (task_status != TaskStatus.REQUESTING_RESOURCES or - self.task_view.get_tasks_to_be_canceled(canceling_progress=CancelingProgress.WORKER_NOTIFIED)): + task_id in {task["task_id"] for task in self.task_view.get_tasks_to_be_canceled( + canceling_progress=CancelingProgress.WORKER_NOTIFIED)}): # this implies the Task has been cancelled or errored somewhere else in the chain -- we should # not allocate any resources to the broken Task. self.update_request_status( diff --git a/alab_management/resource_manager/resource_requester.py b/alab_management/resource_manager/resource_requester.py index 2988b5c5..496adaaa 100644 --- a/alab_management/resource_manager/resource_requester.py +++ b/alab_management/resource_manager/resource_requester.py @@ -2,7 +2,7 @@ TaskLauncher is the core module of the system, which actually executes the tasks. """ - +import concurrent import time from concurrent.futures import Future from datetime import datetime @@ -28,6 +28,14 @@ ] # the raw request sent by task process +class CombinedTimeoutError(TimeoutError, concurrent.futures.TimeoutError): + """ + Combined TimeoutError. + + If you catch either TimeoutError or concurrent.futures.TimeoutError, this will catch both. + """ + + class ResourcesRequest(BaseModel): """ This class is used to validate the resource request. Each request should have a format of @@ -231,7 +239,12 @@ def request_resources( ) # DB_ACCESS_OUTSIDE_VIEW _id: ObjectId = cast(ObjectId, result.inserted_id) self._waiting[_id] = {"f": f, "device_str_to_request": device_str_to_request} - result = f.result(timeout=timeout) + try: + result = f.result(timeout=timeout) + except concurrent.futures.TimeoutError as e: + raise CombinedTimeoutError( + f"Request {result.inserted_id} timed out after {timeout} seconds." + ) from e return { **self._post_process_requested_resource( devices=result["devices"], diff --git a/alab_management/scripts/launch_lab.py b/alab_management/scripts/launch_lab.py index f77c1c10..d79b6929 100644 --- a/alab_management/scripts/launch_lab.py +++ b/alab_management/scripts/launch_lab.py @@ -87,7 +87,7 @@ def launch_lab(host, port, debug): dashboard_thread.daemon = experiment_manager_thread.daemon = ( task_launcher_thread.daemon - ) = device_manager_thread.daemon = True + ) = device_manager_thread.daemon = resource_manager_thread.daemon = True dashboard_thread.start() device_manager_thread.start() diff --git a/alab_management/task_manager/task_manager.py b/alab_management/task_manager/task_manager.py index 2f02d8ba..7ccc268c 100644 --- a/alab_management/task_manager/task_manager.py +++ b/alab_management/task_manager/task_manager.py @@ -34,7 +34,7 @@ def run(self): """Start the loop.""" while True: self._loop() - # time.sleep(1) + time.sleep(1) def _loop(self): self.handle_tasks_to_be_canceled() diff --git a/tests/test_launch.py b/tests/test_launch.py index f7199dbc..75d91d40 100644 --- a/tests/test_launch.py +++ b/tests/test_launch.py @@ -97,7 +97,7 @@ def compose_exp(exp_name, num_samples): exp_id = ObjectId(resp_json["data"]["exp_id"]) self.assertTrue("success", resp_json["status"]) exp_ids.append(exp_id) - time.sleep(30) + time.sleep(50) self.assertEqual(num_of_tasks, self.task_view._task_collection.count_documents({})) self.assertTrue( @@ -153,7 +153,7 @@ def compose_exp(exp_name, error_task): exp_id = ObjectId(resp_json["data"]["exp_id"]) self.assertTrue("success", resp_json["status"]) exp_ids.append(exp_id) - time.sleep(10) + time.sleep(20) pending_user_input = requests.get("http://127.0.0.1:8896/api/userinput/pending").json() self.assertEqual(len(pending_user_input["pending_requests"].get(str(exp_id), [])), 1) @@ -206,6 +206,14 @@ def compose_exp(exp_name): "parameters": {}, "samples": ["test_sample"], }, + { + "type": "Heating", + "prev_tasks": [1], + "parameters": { + "setpoints": ((1, 2),), + }, + "samples": ["test_sample"], + }, ], }