Skip to content

Commit

Permalink
Fix the cancel tasks (#69)
Browse files Browse the repository at this point in the history
* add back the time.sleep in each manager to keep the resource lower.

* hot fix the canceling task

* hot fix timeout

---------

Co-authored-by: Yuxing Fei <[email protected]>
  • Loading branch information
bernardusrendy and idocx authored May 14, 2024
1 parent 061f298 commit 327e667
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 10 deletions.
4 changes: 2 additions & 2 deletions alab_management/experiment_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
tasks and samples and mark the finished tasks in the database when it is
done.
"""

import time
from typing import Any

from .config import AlabOSConfig
Expand Down Expand Up @@ -46,7 +46,7 @@ def run(self):
)
while True:
self._loop()
# time.sleep()
time.sleep(1)

def _loop(self):
self.handle_pending_experiments()
Expand Down
5 changes: 3 additions & 2 deletions alab_management/resource_manager/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def run(self):
"""Start the loop."""
while True:
self._loop()
# time.sleep(1)
time.sleep(0.5)

def _loop(self):
self.handle_released_resources()
Expand Down Expand Up @@ -85,7 +85,8 @@ def _handle_requested_resources(self, request_entry: dict[str, Any]):

task_status = self.task_view.get_status(task_id=task_id)
if (task_status != TaskStatus.REQUESTING_RESOURCES or
self.task_view.get_tasks_to_be_canceled(canceling_progress=CancelingProgress.WORKER_NOTIFIED)):
task_id in {task["task_id"] for task in self.task_view.get_tasks_to_be_canceled(
canceling_progress=CancelingProgress.WORKER_NOTIFIED)}):
# this implies the Task has been cancelled or errored somewhere else in the chain -- we should
# not allocate any resources to the broken Task.
self.update_request_status(
Expand Down
17 changes: 15 additions & 2 deletions alab_management/resource_manager/resource_requester.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
TaskLauncher is the core module of the system,
which actually executes the tasks.
"""

import concurrent
import time
from concurrent.futures import Future
from datetime import datetime
Expand All @@ -28,6 +28,14 @@
] # the raw request sent by task process


class CombinedTimeoutError(TimeoutError, concurrent.futures.TimeoutError):
"""
Combined TimeoutError.
If you catch either TimeoutError or concurrent.futures.TimeoutError, this will catch both.
"""


class ResourcesRequest(BaseModel):
"""
This class is used to validate the resource request. Each request should have a format of
Expand Down Expand Up @@ -231,7 +239,12 @@ def request_resources(
) # DB_ACCESS_OUTSIDE_VIEW
_id: ObjectId = cast(ObjectId, result.inserted_id)
self._waiting[_id] = {"f": f, "device_str_to_request": device_str_to_request}
result = f.result(timeout=timeout)
try:
result = f.result(timeout=timeout)
except concurrent.futures.TimeoutError as e:
raise CombinedTimeoutError(
f"Request {result.inserted_id} timed out after {timeout} seconds."
) from e
return {
**self._post_process_requested_resource(
devices=result["devices"],
Expand Down
2 changes: 1 addition & 1 deletion alab_management/scripts/launch_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def launch_lab(host, port, debug):

dashboard_thread.daemon = experiment_manager_thread.daemon = (
task_launcher_thread.daemon
) = device_manager_thread.daemon = True
) = device_manager_thread.daemon = resource_manager_thread.daemon = True

dashboard_thread.start()
device_manager_thread.start()
Expand Down
2 changes: 1 addition & 1 deletion alab_management/task_manager/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def run(self):
"""Start the loop."""
while True:
self._loop()
# time.sleep(1)
time.sleep(1)

def _loop(self):
self.handle_tasks_to_be_canceled()
Expand Down
12 changes: 10 additions & 2 deletions tests/test_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def compose_exp(exp_name, num_samples):
exp_id = ObjectId(resp_json["data"]["exp_id"])
self.assertTrue("success", resp_json["status"])
exp_ids.append(exp_id)
time.sleep(30)
time.sleep(50)
self.assertEqual(num_of_tasks, self.task_view._task_collection.count_documents({}))

self.assertTrue(
Expand Down Expand Up @@ -153,7 +153,7 @@ def compose_exp(exp_name, error_task):
exp_id = ObjectId(resp_json["data"]["exp_id"])
self.assertTrue("success", resp_json["status"])
exp_ids.append(exp_id)
time.sleep(10)
time.sleep(20)

pending_user_input = requests.get("http://127.0.0.1:8896/api/userinput/pending").json()
self.assertEqual(len(pending_user_input["pending_requests"].get(str(exp_id), [])), 1)
Expand Down Expand Up @@ -206,6 +206,14 @@ def compose_exp(exp_name):
"parameters": {},
"samples": ["test_sample"],
},
{
"type": "Heating",
"prev_tasks": [1],
"parameters": {
"setpoints": ((1, 2),),
},
"samples": ["test_sample"],
},
],
}

Expand Down

0 comments on commit 327e667

Please sign in to comment.