Skip to content

Commit

Permalink
time.sleep(0.5)
Browse files Browse the repository at this point in the history
  • Loading branch information
bernardusrendy committed Apr 21, 2024
1 parent 8cb7ec6 commit 70ab5b2
Show file tree
Hide file tree
Showing 17 changed files with 59 additions and 62 deletions.
26 changes: 13 additions & 13 deletions alab_management/device_view/device_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,6 @@ def sync_device_status(self):
required_status=None,
task_id=None,
)
# Wait until the device status has been updated to the target status
while self.get_status(device_name=device.name).name != status.name:
time.sleep(0.1)

def add_devices_to_db(self):
"""
Expand Down Expand Up @@ -302,7 +299,7 @@ def occupy_device(self, device: BaseDevice | str, task_id: ObjectId):
device_name = device.name if isinstance(device, BaseDevice) else device
# Wait until the device status has been updated to OCCUPIED
while self.get_status(device_name=device_name).name != "OCCUPIED":
time.sleep(0.1)
time.sleep(0.5)

def get_devices_by_task(self, task_id: ObjectId | None) -> list[BaseDevice]:
"""Get devices given a task id (regardless of its status!)."""
Expand Down Expand Up @@ -341,7 +338,7 @@ def release_device(self, device_name: str):
)
# wait until the device status has been updated to IDLE
while self.get_status(device_name=device_name).name != "IDLE":
time.sleep(0.1)
time.sleep(0.5)

def get_samples_on_device(self, device_name: str):
"""Get all samples on a device."""
Expand Down Expand Up @@ -395,7 +392,7 @@ def _update_status(
f"not in allowed set of statuses {[status.name for status in required_status]}. "
f"Cannot change status to {target_status.name}"
)

previous_update_time = device_entry["last_updated"]
self._device_collection.update_one(
{"name": device_name},
{
Expand All @@ -407,8 +404,11 @@ def _update_status(
},
)
# wait until the device status has been updated to target_status
while self.get_status(device_name=device_name).name != target_status.name:
time.sleep(0.1)
while (
self._device_collection.find_one({"name": device_name})["last_updated"]
== previous_update_time
):
time.sleep(0.5)

def query_property(self, device_name: str, prop: str):
"""
Expand Down Expand Up @@ -454,7 +454,7 @@ def set_message(self, device_name: str, message: str):
self.get_device(device_name=device_name)["last_updated"]
== previous_update_time
):
time.sleep(0.1)
time.sleep(0.5)

def get_message(self, device_name: str) -> str:
"""Gets the current device message. Message is used to communicate device state with the user dashboard.
Expand Down Expand Up @@ -517,7 +517,7 @@ def set_all_attributes(self, device_name: str, attributes: dict):
self.get_device(device_name=device_name)["last_updated"]
== previous_update_time
):
time.sleep(0.1)
time.sleep(0.5)

def set_attribute(self, device_name: str, attribute: str, value: Any):
"""Sets a device attribute. Attributes are used to store device-specific values in the database.
Expand All @@ -543,7 +543,7 @@ def set_attribute(self, device_name: str, attribute: str, value: Any):
self.get_device(device_name=device_name)["last_updated"]
== previous_update_time
):
time.sleep(0.1)
time.sleep(0.5)

def pause_device(self, device_name: str):
"""Request pause for a specific device."""
Expand All @@ -568,7 +568,7 @@ def pause_device(self, device_name: str):
self.get_device(device_name=device_name)["pause_status"].name
!= new_pause_status
):
time.sleep(0.1)
time.sleep(0.5)

def unpause_device(self, device_name: str):
"""Unpause a device."""
Expand All @@ -595,7 +595,7 @@ def unpause_device(self, device_name: str):
while (
self.get_device(device_name=device_name)["pause_status"].name != "RELEASED"
):
time.sleep(0.1)
time.sleep(0.5)

def __exit__(self, exc_type, exc_value, traceback):
"""Disconnect from all devices when exiting the context manager."""
Expand Down
2 changes: 1 addition & 1 deletion alab_management/experiment_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def run(self):
)
while True:
self._loop()
time.sleep(0.1)
time.sleep(0.5)

def _loop(self):
self.handle_pending_experiments()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def save_experiment(self, experiment_id: ObjectId):
)
is None
):
time.sleep(0.1)
time.sleep(0.5)
else:
self._completed_experiment_collection.insert_one(experiment_dict)

Expand Down
4 changes: 2 additions & 2 deletions alab_management/experiment_view/experiment_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def update_experiment_status(self, exp_id: ObjectId, status: ExperimentStatus):
)
# Wait until experiment status is updated in the database
while self.get_experiment(exp_id=exp_id)["status"] != status.name:
time.sleep(0.1)
time.sleep(0.5)

def update_sample_task_id(
self, exp_id, sample_ids: list[ObjectId], task_ids: list[ObjectId]
Expand Down Expand Up @@ -175,7 +175,7 @@ def update_sample_task_id(
updated_task_ids = [task["task_id"] for task in experiment["tasks"]]
if updated_sample_ids == sample_ids and updated_task_ids == task_ids:
update = "completed"
time.sleep(0.1)
time.sleep(0.5)

def get_experiment_by_task_id(self, task_id: ObjectId) -> dict[str, Any] | None:
"""Get an experiment that contains a task with the given task_id."""
Expand Down
2 changes: 1 addition & 1 deletion alab_management/sample_view/completed_sample_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def save_sample(self, sample_id: ObjectId):
self._completed_sample_collection.find_one({"_id": ObjectId(sample_id)})
is None
):
time.sleep(0.1)
time.sleep(0.5)

def exists(self, sample_id: ObjectId | str) -> bool:
"""Check if a sample exists in the database.
Expand Down
13 changes: 4 additions & 9 deletions alab_management/sample_view/sample_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def add_sample_positions_to_db(
self._sample_positions_collection.insert_one(new_entry)
# Wait until the sample position is created
while self.get_sample_position(name) is None:
time.sleep(0.1)
time.sleep(0.5)

def clean_up_sample_position_collection(self):
"""Drop the sample position collection."""
Expand Down Expand Up @@ -323,7 +323,7 @@ def lock_sample_position(self, task_id: ObjectId, position: str):
)
# Wait until the position is locked successfully
while not self.is_locked_position(position):
time.sleep(0.1)
time.sleep(0.5)

def release_sample_position(self, position: str):
"""Unlock a sample position."""
Expand All @@ -340,7 +340,7 @@ def release_sample_position(self, position: str):
)
# Wait until the position is released successfully
while self.is_locked_position(position):
time.sleep(0.1)
time.sleep(0.5)

def get_sample_positions_by_task(self, task_id: ObjectId | None) -> list[str]:
"""Get the list of sample positions that is locked by a task (given task id)."""
Expand Down Expand Up @@ -396,7 +396,7 @@ def create_sample(
result = self._sample_collection.insert_one(entry)
# Wait until the sample is created
while not self.exists(result.inserted_id):
time.sleep(0.1)
time.sleep(0.5)
return cast(ObjectId, result.inserted_id)

def get_sample(self, sample_id: ObjectId) -> Sample:
Expand Down Expand Up @@ -441,11 +441,6 @@ def update_sample_task_id(self, sample_id: ObjectId, task_id: ObjectId | None):
}
},
)
# Wait until the task id is updated
while (
self._sample_collection.find_one({"_id": sample_id})["task_id"] != task_id
):
time.sleep(0.1)

def update_sample_metadata(self, sample_id: ObjectId, metadata: dict[str, Any]):
"""Update the metadata for a sample. This adds new metadata or updates existing metadata."""
Expand Down
2 changes: 1 addition & 1 deletion alab_management/scripts/launch_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def launch_worker(args):

args = make_argument_parser().parse_args(
args=["alab_management.task_actor", *args],
namespace=Namespace(processes=4, threads=128 * 4),
namespace=Namespace(processes=12, threads=128),
)
launch(args=args)
return True
2 changes: 1 addition & 1 deletion alab_management/scripts/windows_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def main(self):
self.worker_process.daemon = False
self.worker_process.start()
while self.running:
time.sleep(0.1)
time.sleep(0.5)
if not self.alabos_thread.is_alive() and not self.warned:
print("AlabOS thread is dead")
user_input_view._alarm.alert(
Expand Down
8 changes: 4 additions & 4 deletions alab_management/task_manager/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def run(self):
"""Start the loop."""
while True:
self._loop()
time.sleep(0.1)
time.sleep(0.5)

def _loop(self):
self.submit_ready_tasks()
Expand Down Expand Up @@ -362,7 +362,7 @@ def _handle_requested_resources(self, request_entry: dict[str, Any]):
)["parsed_sample_positions_request"] != [
dict(spr) for spr in parsed_sample_positions_request
]:
time.sleep(0.1)
time.sleep(0.5)

sample_positions = self.sample_view.request_sample_positions(
task_id=task_id, sample_positions=parsed_sample_positions_request
Expand All @@ -387,7 +387,7 @@ def _handle_requested_resources(self, request_entry: dict[str, Any]):
self.get_request(request_entry["_id"], projection=["status"])["status"]
!= RequestStatus.ERROR.name
):
time.sleep(0.1)
time.sleep(0.5)
return

# if both devices and sample positions can be satisfied
Expand Down Expand Up @@ -419,7 +419,7 @@ def _handle_requested_resources(self, request_entry: dict[str, Any]):
self._release_devices(devices)
self._release_sample_positions(sample_positions)
return
time.sleep(0.1)
time.sleep(0.5)
# label the resources as occupied
self._occupy_devices(devices=devices, task_id=task_id)
self._occupy_sample_positions(
Expand Down
24 changes: 12 additions & 12 deletions alab_management/task_view/task_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def create_task(
result = self._task_collection.insert_one(entry)
# Wait until the task is inserted
while not self.exists(task_id=cast(ObjectId, result.inserted_id)):
time.sleep(0.1)
time.sleep(0.5)

return cast(ObjectId, result.inserted_id)

Expand Down Expand Up @@ -115,7 +115,7 @@ def create_subtask(
)
# Wait until the subtask is inserted
while self.get_task(task_id=task_id)["last_updated"] == previous_update_time:
time.sleep(0.1)
time.sleep(0.5)
return subtask_id

def get_task(self, task_id: ObjectId, encode: bool = False) -> dict[str, Any]:
Expand Down Expand Up @@ -171,7 +171,7 @@ def update_status(self, task_id: ObjectId, status: TaskStatus):
status: the new status of the task
"""
task = self.get_task(task_id=task_id, encode=False)

previous_update_time = task["last_updated"]
update_dict = {
"status": status.name,
"last_updated": datetime.now(),
Expand All @@ -186,8 +186,8 @@ def update_status(self, task_id: ObjectId, status: TaskStatus):
{"$set": update_dict},
)
# Wait until the status is updated
while self.get_status(task_id=task_id).name != status.name:
time.sleep(0.1)
while self.get_task(task_id=task_id)["last_updated"] == previous_update_time:
time.sleep(0.5)

if status is TaskStatus.COMPLETED:
# try to figure out tasks that is READY
Expand Down Expand Up @@ -241,7 +241,7 @@ def update_status(self, task_id: ObjectId, status: TaskStatus):
self.get_task(task_id=next_task_id)["last_updated"]
== previous_update_time
):
time.sleep(0.1)
time.sleep(0.5)
self.try_to_mark_task_ready(
task_id=next_task_id
) # in case it was only waiting on task we just cancelled
Expand Down Expand Up @@ -274,7 +274,7 @@ def update_subtask_status(
)
# Wait until the status is updated
while self.get_task(task_id=task_id)["last_updated"] == previous_update_time:
time.sleep(0.1)
time.sleep(0.5)

def update_result(
self, task_id: ObjectId, name: str | None = None, value: Any = None
Expand Down Expand Up @@ -306,7 +306,7 @@ def update_result(
)
# Wait until the status is updated
while self.get_task(task_id=task_id)["last_updated"] == previous_update_time:
time.sleep(0.1)
time.sleep(0.5)

def update_subtask_result(
self, task_id: ObjectId, subtask_id: ObjectId, result: Any
Expand Down Expand Up @@ -345,7 +345,7 @@ def update_subtask_result(
)
# Wait until the status is updated
while self.get_task(task_id=task_id)["last_updated"] == previous_update_time:
time.sleep(0.1)
time.sleep(0.5)

def try_to_mark_task_ready(self, task_id: ObjectId):
"""
Expand Down Expand Up @@ -451,7 +451,7 @@ def update_task_dependency(
self.get_task(task_id=task_id, encode=False)["last_updated"]
== previous_update_time
):
time.sleep(0.1)
time.sleep(0.5)

def set_message(self, task_id: ObjectId, message: str):
"""Set message for one task. This is displayed on the dashboard."""
Expand All @@ -467,7 +467,7 @@ def set_message(self, task_id: ObjectId, message: str):
)
# Wait until the status is updated
while self.get_task(task_id=task_id)["last_updated"] == previous_update_time:
time.sleep(0.1)
time.sleep(0.5)

def set_task_actor_id(self, task_id: ObjectId, message_id: str):
"""
Expand All @@ -489,7 +489,7 @@ def set_task_actor_id(self, task_id: ObjectId, message_id: str):
)
# Wait until the status is updated
while self.get_task(task_id=task_id)["last_updated"] == previous_update_time:
time.sleep(0.1)
time.sleep(0.5)

def mark_task_as_cancelling(self, task_id: ObjectId):
"""
Expand Down
16 changes: 9 additions & 7 deletions alab_management/user_input.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
from datetime import datetime
from enum import Enum
from typing import Any, cast

Expand Down Expand Up @@ -69,11 +70,12 @@ def insert_request(
"options": [str(opt) for opt in options],
"status": UserRequestStatus.PENDING.value,
"request_context": context,
"last_updated": datetime.now(),
}
)
# Wait until the request is inserted
while self.get_request(request_id)["status"] != UserRequestStatus.PENDING.value:
time.sleep(0.1)
while self.get_request(request_id) is None:
time.sleep(0.5)
if maintenance is True:
category = "Maintenance"
self._alarm.alert(f"User input requested: {prompt}", category)
Expand All @@ -94,21 +96,21 @@ def get_request(self, request_id: ObjectId) -> dict[str, Any]:
def update_request_status(self, request_id: ObjectId, response: str, note: str):
"""Update the status of a request."""
self.get_request(request_id) # will error is request does not exist
previous_update_time = self.get_request(request_id)["last_updated"]
self._input_collection.update_one(
{"_id": request_id},
{
"$set": {
"response": response,
"note": note,
"status": UserRequestStatus.FULLFILLED.value,
"last_updated": datetime.now(),
}
},
)
# Wait until the status is updated
while (
self.get_request(request_id)["status"] != UserRequestStatus.FULLFILLED.value
):
time.sleep(0.1)
while self.get_request(request_id)["last_updated"] == previous_update_time:
time.sleep(0.5)

def retrieve_user_input(self, request_id: ObjectId) -> str:
"""
Expand All @@ -122,7 +124,7 @@ def retrieve_user_input(self, request_id: ObjectId) -> str:
if request is None:
raise ValueError(f"User input request id {request_id} does not exist!")
status = UserRequestStatus(request["status"])
time.sleep(0.1)
time.sleep(0.5)
return request["response"]

def clean_up_user_input_collection(self):
Expand Down
Loading

0 comments on commit 70ab5b2

Please sign in to comment.