Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add update_sample_metadata() function to lab_view for convenient access #58

Merged
merged 1 commit into from
Mar 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 39 additions & 67 deletions alab_management/lab_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,7 @@ def preprocess(cls, values):
"""Preprocess the request to make sure the request is in the correct format."""
values = values["__root__"]
# if the sample position request is string, we will automatically add a number attribute = 1.
values = {
k: [
SamplePositionRequest.from_str(v_) if isinstance(v_, str) else v_
for v_ in v
]
for k, v in values.items()
}
values = {k: [SamplePositionRequest.from_str(v_) if isinstance(v_, str) else v_ for v_ in v] for k, v in values.items()}
return {"__root__": values}


Expand All @@ -66,9 +60,7 @@ class LabView:

def __init__(self, task_id: ObjectId):
self._task_view = TaskView()
self.__task_entry = self._task_view.get_task(
task_id=task_id
) # will throw error if task_id does not exist
self.__task_entry = self._task_view.get_task(task_id=task_id) # will throw error if task_id does not exist
self._experiment_view = ExperimentView()
self._task_id = task_id
self._sample_view = SampleView()
Expand All @@ -86,9 +78,7 @@ def task_id(self) -> ObjectId:
@contextmanager
def request_resources(
self,
resource_request: Dict[
Optional[Union[Type[BaseDevice], str]], Dict[str, Union[str, int]]
],
resource_request: Dict[Optional[Union[Type[BaseDevice], str]], Dict[str, Union[str, int]]],
priority: Optional[int] = None,
timeout: Optional[float] = None,
):
Expand All @@ -114,18 +104,13 @@ def request_resources(
"""
priority = priority or self.priority

self._task_view.update_status(
task_id=self.task_id, status=TaskStatus.REQUESTING_RESOURCES
)
result = self._resource_requester.request_resources(
resource_request=resource_request, timeout=timeout, priority=priority
)
self._task_view.update_status(task_id=self.task_id, status=TaskStatus.REQUESTING_RESOURCES)
result = self._resource_requester.request_resources(resource_request=resource_request, timeout=timeout, priority=priority)
devices = result["devices"]
sample_positions = result["sample_positions"]
request_id = result["request_id"]
devices = {
device_type: self._device_client.create_device_wrapper(device_name)
for device_type, device_name in devices.items()
device_type: self._device_client.create_device_wrapper(device_name) for device_type, device_name in devices.items()
} # type: ignore
self._task_view.update_status(task_id=self.task_id, status=TaskStatus.RUNNING)
yield devices, sample_positions
Expand All @@ -141,9 +126,7 @@ def _sample_name_to_id(self, sample_name: str) -> ObjectId:
for sample in self.__task_entry["samples"]:
if sample["name"] == sample_name:
return sample["sample_id"]
raise ValueError(
f"No sample with name \"{sample_name}\" found for task \"{self.__task_entry['type']}\""
)
raise ValueError(f"No sample with name \"{sample_name}\" found for task \"{self.__task_entry['type']}\"")

def get_sample(self, sample: Union[ObjectId, str]) -> Sample:
"""
Expand All @@ -169,23 +152,15 @@ def move_sample(self, sample: Union[ObjectId, str], position: Optional[str]):
:py:meth:`move_sample <alab_management.sample_view.sample_view.SampleView.move_sample>`
"""
# check if this sample position is locked by current task
if (
position is not None
and self._sample_view.get_sample_position_status(position)[1]
!= self._task_id
):
raise ValueError(
f"Cannot move sample to the new sample position ({position}) without locking it."
)
if position is not None and self._sample_view.get_sample_position_status(position)[1] != self._task_id:
raise ValueError(f"Cannot move sample to the new sample position ({position}) without locking it.")

# check if this sample is owned by current task
sample_entry = self.get_sample(sample=sample)
if sample_entry.task_id != self._task_id:
raise ValueError("Cannot move a sample that is not belong to this task.")

return self._sample_view.move_sample(
sample_id=sample_entry.sample_id, position=position
)
return self._sample_view.move_sample(sample_id=sample_entry.sample_id, position=position)

def get_locked_sample_positions(self) -> List[str]:
"""Get a list of sample positions that are occupied by this task."""
Expand All @@ -195,9 +170,25 @@ def get_sample_position_parent_device(self, position: str) -> Optional[str]:
"""Get the name of the device that owns the sample position."""
return self._sample_view.get_sample_position_parent_device(position=position)

def run_subtask(
self, task: Type[BaseTask], samples: List[Union[ObjectId, str]], **kwargs
):
def update_sample_metadata(self, sample: Union[ObjectId, str], metadata: Dict[str, Any]):
"""
Update the metadata of a sample. `sample` can be given as either an ObjectId corresponding to sample_id,
or as a string corresponding to the sample's name within the experiment.

See Also
--------
:py:meth:`update_sample_metadata <alab_management.sample_view.sample_view.SampleView.update_sample_metadata>`
"""
if isinstance(sample, str):
sample_id = self._sample_name_to_id(sample)
elif isinstance(sample, ObjectId):
sample_id = sample
else:
raise TypeError("sample must be a sample name (str) or id (ObjectId)")

return self._sample_view.update_sample_metadata(sample_id=sample_id, metadata=metadata)

def run_subtask(self, task: Type[BaseTask], samples: List[Union[ObjectId, str]], **kwargs):
"""Run a task as a subtask within the task. basically fills in task_id and lab_view for you.
this command blocks until the subtask is completed.

Expand Down Expand Up @@ -231,12 +222,8 @@ def run_subtask(
**kwargs,
)
except Exception as exc:
self._task_view.update_subtask_status(
task_id=task_id, subtask_id=subtask_id, status=TaskStatus.ERROR
)
self._task_view.update_subtask_result(
task_id=task_id, subtask_id=subtask_id, result=str(exc)
)
self._task_view.update_subtask_status(task_id=task_id, subtask_id=subtask_id, status=TaskStatus.ERROR)
self._task_view.update_subtask_result(task_id=task_id, subtask_id=subtask_id, result=str(exc))
raise Exception(
"Failed to create subtask of type {} within task {} of type {}".format(
task,
Expand All @@ -254,17 +241,11 @@ def run_subtask(
},
)
try:
self._task_view.update_subtask_status(
task_id=task_id, subtask_id=subtask_id, status=TaskStatus.RUNNING
)
self._task_view.update_subtask_status(task_id=task_id, subtask_id=subtask_id, status=TaskStatus.RUNNING)
result = subtask.run() # block until completion
except Exception as exception:
self._task_view.update_subtask_status(
task_id=task_id, subtask_id=subtask_id, status=TaskStatus.ERROR
)
self._task_view.update_subtask_result(
task_id=task_id, subtask_id=subtask_id, result=str(exception)
)
self._task_view.update_subtask_status(task_id=task_id, subtask_id=subtask_id, status=TaskStatus.ERROR)
self._task_view.update_subtask_result(task_id=task_id, subtask_id=subtask_id, result=str(exception))
self.logger.system_log(
level="ERROR",
log_data={
Expand All @@ -278,12 +259,8 @@ def run_subtask(
)
raise
else:
self._task_view.update_subtask_status(
task_id=task_id, subtask_id=subtask_id, status=TaskStatus.COMPLETED
)
self._task_view.update_subtask_result(
task_id=task_id, subtask_id=subtask_id, result=result
)
self._task_view.update_subtask_status(task_id=task_id, subtask_id=subtask_id, status=TaskStatus.COMPLETED)
self._task_view.update_subtask_result(task_id=task_id, subtask_id=subtask_id, result=result)
self.logger.system_log(
level="INFO",
log_data={
Expand Down Expand Up @@ -326,19 +303,14 @@ def update_result(self, name: str, value: Any):

def request_cleanup(self):
"""Request cleanup of the task. This function will block until the task is cleaned up."""
all_reserved_sample_positions = self._sample_view.get_sample_positions_by_task(
self.task_id
)
all_reserved_sample_positions = self._sample_view.get_sample_positions_by_task(self.task_id)

all_samples = self.__task_entry["samples"]
all_positions_with_samples = [
self._sample_view.get_sample(sample_entry["sample_id"]).position
for sample_entry in all_samples
self._sample_view.get_sample(sample_entry["sample_id"]).position for sample_entry in all_samples
]

all_positions_with_samples = [
each for each in all_positions_with_samples if each
]
all_positions_with_samples = [each for each in all_positions_with_samples if each]

self.request_user_input(
prompt="A unrecoverable error has occurred.\n"
Expand Down
Loading