Skip to content

Commit

Permalink
Additional checks and Updated time.sleep for waiting for DB updates
Browse files Browse the repository at this point in the history
  • Loading branch information
bernardusrendy committed Apr 20, 2024
1 parent 4f915e5 commit 32f5fc3
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 71 deletions.
18 changes: 9 additions & 9 deletions alab_management/device_view/device_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def sync_device_status(self):
)
# Wait until the device status has been updated to the target status
while self.get_status(device_name=device.name).name != status.name:
time.sleep(0.5)
time.sleep(0.1)

def add_devices_to_db(self):
"""
Expand Down Expand Up @@ -302,7 +302,7 @@ def occupy_device(self, device: BaseDevice | str, task_id: ObjectId):
device_name = device.name if isinstance(device, BaseDevice) else device
# Wait until the device status has been updated to OCCUPIED
while self.get_status(device_name=device_name).name != "OCCUPIED":
time.sleep(0.5)
time.sleep(0.1)

def get_devices_by_task(self, task_id: ObjectId | None) -> list[BaseDevice]:
"""Get devices given a task id (regardless of its status!)."""
Expand Down Expand Up @@ -341,7 +341,7 @@ def release_device(self, device_name: str):
)
# wait until the device status has been updated to IDLE
while self.get_status(device_name=device_name).name != "IDLE":
time.sleep(0.5)
time.sleep(0.1)

def get_samples_on_device(self, device_name: str):
"""Get all samples on a device."""
Expand Down Expand Up @@ -408,7 +408,7 @@ def _update_status(
)
# wait until the device status has been updated to target_status
while self.get_status(device_name=device_name).name != target_status.name:
time.sleep(0.5)
time.sleep(0.1)

def query_property(self, device_name: str, prop: str):
"""
Expand Down Expand Up @@ -454,7 +454,7 @@ def set_message(self, device_name: str, message: str):
self.get_device(device_name=device_name)["last_updated"]
== previous_update_time
):
time.sleep(0.5)
time.sleep(0.1)

def get_message(self, device_name: str) -> str:
"""Gets the current device message. Message is used to communicate device state with the user dashboard.
Expand Down Expand Up @@ -517,7 +517,7 @@ def set_all_attributes(self, device_name: str, attributes: dict):
self.get_device(device_name=device_name)["last_updated"]
== previous_update_time
):
time.sleep(0.5)
time.sleep(0.1)

def set_attribute(self, device_name: str, attribute: str, value: Any):
"""Sets a device attribute. Attributes are used to store device-specific values in the database.
Expand All @@ -543,7 +543,7 @@ def set_attribute(self, device_name: str, attribute: str, value: Any):
self.get_device(device_name=device_name)["last_updated"]
== previous_update_time
):
time.sleep(0.5)
time.sleep(0.1)

def pause_device(self, device_name: str):
"""Request pause for a specific device."""
Expand All @@ -568,7 +568,7 @@ def pause_device(self, device_name: str):
self.get_device(device_name=device_name)["pause_status"].name
!= new_pause_status
):
time.sleep(0.5)
time.sleep(0.1)

def unpause_device(self, device_name: str):
"""Unpause a device."""
Expand All @@ -595,7 +595,7 @@ def unpause_device(self, device_name: str):
while (
self.get_device(device_name=device_name)["pause_status"].name != "RELEASED"
):
time.sleep(0.5)
time.sleep(0.1)

def __exit__(self, exc_type, exc_value, traceback):
"""Disconnect from all devices when exiting the context manager."""
Expand Down
2 changes: 1 addition & 1 deletion alab_management/experiment_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def run(self):
)
while True:
self._loop()
time.sleep(2)
time.sleep(0.1)

def _loop(self):
self.handle_pending_experiments()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def save_experiment(self, experiment_id: ObjectId):
)
is None
):
time.sleep(0.5)
time.sleep(0.1)
else:
self._completed_experiment_collection.insert_one(experiment_dict)

Expand Down
4 changes: 2 additions & 2 deletions alab_management/experiment_view/experiment_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def update_experiment_status(self, exp_id: ObjectId, status: ExperimentStatus):
)
# Wait until experiment status is updated in the database
while self.get_experiment(exp_id=exp_id)["status"] != status.name:
time.sleep(0.5)
time.sleep(0.1)

def update_sample_task_id(
self, exp_id, sample_ids: list[ObjectId], task_ids: list[ObjectId]
Expand Down Expand Up @@ -175,7 +175,7 @@ def update_sample_task_id(
updated_task_ids = [task["task_id"] for task in experiment["tasks"]]
if updated_sample_ids == sample_ids and updated_task_ids == task_ids:
update = "completed"
time.sleep(0.5)
time.sleep(0.1)

def get_experiment_by_task_id(self, task_id: ObjectId) -> dict[str, Any] | None:
"""Get an experiment that contains a task with the given task_id."""
Expand Down
25 changes: 15 additions & 10 deletions alab_management/lab_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,22 @@ def request_resources(
result = self._resource_requester.request_resources(
resource_request=resource_request, timeout=timeout, priority=priority
)
devices = result["devices"]
sample_positions = result["sample_positions"]
request_id = result["request_id"]
devices = {
device_type: self._device_client.create_device_wrapper(device_name)
for device_type, device_name in devices.items()
} # type: ignore
self._task_view.update_status(task_id=self.task_id, status=TaskStatus.RUNNING)
yield devices, sample_positions
error = result["error"]
if error:
self._resource_requester.release_resources(request_id=request_id)
raise error
else:
devices = result["devices"]
sample_positions = result["sample_positions"]
devices = {
device_type: self._device_client.create_device_wrapper(device_name)
for device_type, device_name in devices.items()
} # type: ignore
self._task_view.update_status(task_id=self.task_id, status=TaskStatus.RUNNING)
yield devices, sample_positions

self._resource_requester.release_resources(request_id=request_id)
self._resource_requester.release_resources(request_id=request_id)

def _sample_name_to_id(self, sample_name: str) -> ObjectId:
"""
Expand Down Expand Up @@ -179,7 +184,7 @@ def move_sample(self, sample: ObjectId | str, position: str | None):
# check if this sample is owned by current task
sample_entry = self.get_sample(sample=sample)
if sample_entry.task_id != self._task_id:
raise ValueError("Cannot move a sample that is not belong to this task.")
raise ValueError("Cannot move a sample that does not belong to this task.")

return self._sample_view.move_sample(
sample_id=sample_entry.sample_id, position=position
Expand Down
2 changes: 1 addition & 1 deletion alab_management/sample_view/completed_sample_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def save_sample(self, sample_id: ObjectId):
self._completed_sample_collection.find_one({"_id": ObjectId(sample_id)})
is None
):
time.sleep(0.5)
time.sleep(0.1)

def exists(self, sample_id: ObjectId | str) -> bool:
"""Check if a sample exists in the database.
Expand Down
22 changes: 11 additions & 11 deletions alab_management/sample_view/sample_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def add_sample_positions_to_db(
self._sample_positions_collection.insert_one(new_entry)
# Wait until the sample position is created
while self.get_sample_position(name) is None:
time.sleep(0.5)
time.sleep(0.1)

def clean_up_sample_position_collection(self):
"""Drop the sample position collection."""
Expand Down Expand Up @@ -323,7 +323,7 @@ def lock_sample_position(self, task_id: ObjectId, position: str):
)
# Wait until the position is locked successfully
while not self.is_locked_position(position):
time.sleep(0.5)
time.sleep(0.1)

def release_sample_position(self, position: str):
"""Unlock a sample position."""
Expand All @@ -340,7 +340,7 @@ def release_sample_position(self, position: str):
)
# Wait until the position is released successfully
while self.is_locked_position(position):
time.sleep(0.5)
time.sleep(0.1)

def get_sample_positions_by_task(self, task_id: ObjectId | None) -> list[str]:
"""Get the list of sample positions that is locked by a task (given task id)."""
Expand Down Expand Up @@ -396,7 +396,7 @@ def create_sample(
result = self._sample_collection.insert_one(entry)
# Wait until the sample is created
while not self.exists(result.inserted_id):
time.sleep(0.5)
time.sleep(0.1)
return cast(ObjectId, result.inserted_id)

def get_sample(self, sample_id: ObjectId) -> Sample:
Expand Down Expand Up @@ -442,8 +442,8 @@ def update_sample_task_id(self, sample_id: ObjectId, task_id: ObjectId | None):
},
)
# Wait until the task id is updated
while self.get_sample(sample_id).task_id != task_id:
time.sleep(0.5)
while self._sample_collection.find_one({"_id": sample_id})["task_id"] != task_id:
time.sleep(0.1)

def update_sample_metadata(self, sample_id: ObjectId, metadata: dict[str, Any]):
"""Update the metadata for a sample. This adds new metadata or updates existing metadata."""
Expand All @@ -459,8 +459,8 @@ def update_sample_metadata(self, sample_id: ObjectId, metadata: dict[str, Any]):
{"$set": update_dict},
)
# Wait until the metadata is updated
while self.get_sample(sample_id).last_updated == previous_update_time:
time.sleep(0.5)
while self._sample_collection.find_one({"_id": sample_id})["last_updated"] == previous_update_time:
time.sleep(0.1)

def move_sample(self, sample_id: ObjectId, position: str | None):
"""Update the sample with new position."""
Expand All @@ -475,7 +475,7 @@ def move_sample(self, sample_id: ObjectId, position: str | None):
raise ValueError(
f"Requested position ({position}) is not EMPTY or LOCKED by other task."
)

previous_update_time = result["last_updated"]
self._sample_collection.update_one(
{"_id": sample_id},
{
Expand All @@ -486,8 +486,8 @@ def move_sample(self, sample_id: ObjectId, position: str | None):
},
)
# Wait until the position is updated
while self.get_sample(sample_id).position != position:
time.sleep(0.5)
while self._sample_collection.find_one({"_id": sample_id})["last_updated"] == previous_update_time:
time.sleep(0.1)

def exists(self, sample_id: ObjectId | str) -> bool:
"""Check if a sample exists in the database.
Expand Down
2 changes: 1 addition & 1 deletion alab_management/scripts/launch_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def launch_lab(host, port, debug):
task_launcher_thread.start()

while True:
time.sleep(0.001)
time.sleep(0.1)
if not experiment_manager_thread.is_alive():
sys.exit(1001)

Expand Down
2 changes: 1 addition & 1 deletion alab_management/scripts/launch_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def launch_worker(args):

args = make_argument_parser().parse_args(
args=["alab_management.task_actor", *args],
namespace=Namespace(processes=4, threads=128),
namespace=Namespace(processes=4, threads=128*4),
)
launch(args=args)
return True
34 changes: 25 additions & 9 deletions alab_management/task_manager/resource_requester.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def update_request_status(self, request_id: ObjectId, status: RequestStatus):
while (
self.get_request(request_id, projection=["status"])["status"] != status.name
):
time.sleep(0.5)
time.sleep(0.1)
return value_returned

def get_request(self, request_id: ObjectId, **kwargs) -> dict[str, Any] | None:
Expand Down Expand Up @@ -226,15 +226,17 @@ def request_resources(
while (self.get_request(_id, projection=["status"]))[
"status"
] != "CANCELED":
time.sleep(0.5)
raise
time.sleep(0.1)
return {"request_id": result["request_id"],
"error": TimeoutError}
return {
**self._post_process_requested_resource(
devices=result["devices"],
sample_positions=result["sample_positions"],
resource_request=resource_request,
),
"request_id": result["request_id"],
"error": None
}

def release_resources(self, request_id: ObjectId) -> bool:
Expand All @@ -251,14 +253,21 @@ def release_resources(self, request_id: ObjectId) -> bool:
},
)

# wait for the request to be released
# wait for the request to update
while (
self.get_request(request_id, projection=["status"])["status"]
!= "NEED_RELEASE"
):
time.sleep(0.5)
time.sleep(0.1)

# wait for the request to be released
while (
self.get_request(request_id, projection=["status"])["status"]
!= RequestStatus.RELEASED.name
):
time.sleep(0.1)

return result.modified_count == 1
return result.modified_count == 2

def release_all_resources(self):
"""
Expand Down Expand Up @@ -291,12 +300,19 @@ def release_all_resources(self):
}
},
)
# wait for all the requests to be released
# wait for all the requests to be updated
while any(
request["status"] == RequestStatus.NEED_RELEASE.name
for request in self.get_requests_by_task_id(self.task_id)
):
time.sleep(0.5)
time.sleep(0.1)

# wait for all the requests to be released
while any(
request["status"] != RequestStatus.RELEASED.name
for request in self.get_requests_by_task_id(self.task_id)
):
time.sleep(0.1)

def _check_request_status_loop(self):
while not self._stop:
Expand All @@ -310,7 +326,7 @@ def _check_request_status_loop(self):
except Exception:
print_exc() # for debugging in the test
raise
time.sleep(0.5)
time.sleep(0.1)

def _handle_fulfilled_request(self, request_id: ObjectId):
entry = self.get_request(request_id)
Expand Down
Loading

0 comments on commit 32f5fc3

Please sign in to comment.