diff --git a/examples/protocols/calibration/multicolor-protocol-calibration-small/multicolor-particle-calibration-small.py b/examples/protocols/calibration/multicolor-protocol-calibration-small/multicolor-particle-calibration-small.py index 728a28d..f06e1a2 100644 --- a/examples/protocols/calibration/multicolor-protocol-calibration-small/multicolor-particle-calibration-small.py +++ b/examples/protocols/calibration/multicolor-protocol-calibration-small/multicolor-particle-calibration-small.py @@ -315,7 +315,7 @@ parameter_values=[], ) -execution.to_dot().view() +# execution.to_dot().view() with open(os.path.join(OUT_DIR, f"{filename}.nt"), "w") as f: f.write(doc.write_string(sbol3.SORTED_NTRIPLES).strip()) diff --git a/labop/execution/behavior_dynamics.py b/labop/execution/behavior_dynamics.py index 5244973..8244b4d 100644 --- a/labop/execution/behavior_dynamics.py +++ b/labop/execution/behavior_dynamics.py @@ -16,10 +16,14 @@ # Project packages import uml -from labop import ActivityNodeExecution, SampleArray, SampleCollection, SampleMap from labop.data import deserialize_sample_format, serialize_sample_format from labop.strings import Strings +from ..activity_node_execution import ActivityNodeExecution +from ..sample_array import SampleArray +from ..sample_collection import SampleCollection +from ..sample_map import SampleMap + class SampleProvenanceObserver: """ diff --git a/labop/execution/execution_engine.py b/labop/execution/execution_engine.py index 6ddf49c..dc971f1 100644 --- a/labop/execution/execution_engine.py +++ b/labop/execution/execution_engine.py @@ -2,6 +2,7 @@ import hashlib import logging import os +import types import uuid from abc import ABC from typing import Callable, Dict, List, Optional, Tuple, Union @@ -23,16 +24,21 @@ from labop.sample_data import SampleData from labop.strings import Strings from uml import ActivityNode, CallBehaviorAction +from uml.action import Action from uml.activity import Activity from uml.activity_edge import ActivityEdge from uml.activity_parameter_node import ActivityParameterNode +from uml.fork_node import ForkNode +from uml.literal_specification import LiteralSpecification from uml.object_flow import ObjectFlow from uml.output_pin import OutputPin +from uml.parameter import Parameter from uml.pin import Pin from uml.utils import WellFormednessIssue, WellformednessLevels, literal from .behavior_dynamics import SampleProvenanceObserver from .execution_context import ExecutionContext +from .primitive_execution import primitive_to_output_function l: logging.Logger = logging.getLogger(__file__) l.setLevel(logging.ERROR) @@ -182,6 +188,16 @@ def get_current_time(self, as_string: bool = False): cur_time = self.start_time + rel_start return cur_time if not as_string else str(cur_time) + def initialize_primitive_compute_output(self, doc: sbol3.Document): + for k, v in primitive_to_output_function.items(): + try: + p = Primitive.get_primitive(doc, k, copy_to_doc=False) + p.compute_output = types.MethodType(v, p) + except Exception as e: + l.warning( + f"Could not set compute_output() for primitive {k}, did you import the correct library?" + ) + def initialize( self, protocol: Protocol, @@ -198,7 +214,7 @@ def initialize( if self.use_defined_primitives: # Define the compute_output function for known primitives - Primitive.initialize_primitive_compute_output(doc) + self.initialize_primitive_compute_output(doc) # First, set up the record for the protocol and parameter values if self.ex is not None and overwrite_execution: @@ -468,7 +484,8 @@ def next_tokens( ActivityEdgeFlow( edge=edge, token_source=record, - value=node.get_value( + value=self.get_value( + node, edge, parameter_value_map, node_outputs, @@ -505,7 +522,8 @@ def next_tokens( ActivityEdgeFlow( edge=edge, token_source=record, - value=node.get_value( + value=self.get_value( + node, edge, parameter_value_map, node_outputs, @@ -559,6 +577,178 @@ def next_tokens( return new_tokens + def get_value( + self, + node: ActivityNode, + edge: "ActivityEdge", + node_inputs: Dict[str, Union[List[LiteralSpecification], LiteralSpecification]], + node_outputs: Callable, + sample_format: str, + invocation_hash: int, + ): + value = "" + reference = False + from uml.control_flow import ControlFlow + from uml.object_flow import ObjectFlow + + if isinstance(edge, ControlFlow): + value = ["uml.ControlFlow"] + elif isinstance(edge, ObjectFlow): + if isinstance(node, Pin): + value = self.get_pin_value( + node, + edge, + node_inputs, + node_outputs, + sample_format, + invocation_hash, + ) + elif isinstance(node, ForkNode): + value = self.get_fork_node_value( + node, + edge, + node_inputs, + node_outputs, + sample_format, + invocation_hash, + ) + elif isinstance(node, ActivityParameterNode): + value = self.get_activity_parameter_node_value( + node, + edge, + node_inputs, + node_outputs, + sample_format, + invocation_hash, + ) + elif isinstance(node, Action): + value = self.get_action_value( + node, + edge, + node_inputs, + node_outputs, + sample_format, + invocation_hash, + ) + + reference = lambda v: (isinstance(v, sbol3.Identified) and v.identity != None) + if isinstance(value, list): + value = [literal(v, reference=reference(v)) for v in value] + else: + value = [literal(value, reference=reference(value))] + return value + + # from OutputPin + def get_pin_value( + self, + node: "Pin", + edge: "ActivityEdge", + node_inputs: Dict[str, Union[List[LiteralSpecification], LiteralSpecification]], + node_outputs: Callable, + sample_format: str, + invocation_hash: int, + ): + value = node_inputs[node.name] + return value + + # from ForkNode + def get_fork_node_value( + self, + node: "ForkNode", + edge: "ActivityEdge", + parameter_value_map: Dict[str, List[LiteralSpecification]], + node_outputs: Callable, + sample_format: str, + invocation_hash: int, + ): + value = next(iter(parameter_value_map.values())) + return value + + # from ActivityParameterNode + def get_activity_parameter_node_value( + self, + node: ActivityParameterNode, + edge: "ActivityEdge", + parameter_value_map: Dict[str, List[LiteralSpecification]], + node_outputs: Callable, + sample_format: str, + invocation_hash: int, + ): + if node.is_output(): + value = parameter_value_map[node.name] + else: + value = "" + return value + + # from Action + def get_action_value( + self, + node: Action, + edge: "ActivityEdge", + parameter_value_map: Dict[str, List[LiteralSpecification]], + node_outputs: Callable, + sample_format: str, + invocation_hash: int, + ): + parameter = node.get_parameter(name=edge.get_target().name) + value = self.get_parameter_value( + node, + parameter, + parameter_value_map, + node_outputs, + sample_format, + invocation_hash, + ) + + return value + + # from CBA + def get_call_behavior_action_value( + self, + node: CallBehaviorAction, + edge: "ActivityEdge", + parameter_value_map: Dict[str, List[LiteralSpecification]], + node_outputs: Callable, + sample_format: str, + invocation_hash: int, + ): + from .activity import Activity + + if isinstance(node.get_behavior(), Activity): + # Activity Invocations do not send objects to their output pins + # The activity output parameters will send object to the output pins. + value = "uml.ControlFlow" + reference = False + else: + parameter = node.get_parameter(name=edge.get_target().name) + value = node_outputs( + parameter, parameter_value_map, sample_format, invocation_hash + ) + return value + + def get_parameter_value( + self, + call_behavior_action: CallBehaviorAction, + parameter: Parameter, + parameter_value_map: Dict[str, List[LiteralSpecification]], + node_outputs: Callable, + sample_format: str, + invocation_hash: int, + ): + if node_outputs: + value = node_outputs(self, parameter) + elif hasattr(call_behavior_action.get_behavior(), "compute_output"): + value = call_behavior_action.get_behavior().compute_output( + parameter_value_map, + parameter, + sample_format, + invocation_hash, + self, + ) + else: + value = f"{parameter.name}" + return value + def possible_calling_behaviors(self, node: ActivityNode): # Get all CallBehaviorAction nodes that correspond to a CallBehaviorExecution that supports the current execution of the ActivityNode tokens_supporting_node = [ diff --git a/labop/execution/harness.py b/labop/execution/harness.py index bb62ece..da08850 100644 --- a/labop/execution/harness.py +++ b/labop/execution/harness.py @@ -518,7 +518,9 @@ def __init__(self, *args, **kwargs): self.base_dir = kwargs["base_dir"] if "base_dir" in kwargs else "." self.output_dir = ( - kwargs["output_dir"] if "output_dir" in kwargs else "artifacts" + kwargs["output_dir"] + if "output_dir" in kwargs + else f"{self.protocol_name}/artifacts" ) self.full_output_dir = os.path.join(self.base_dir, self.output_dir) @@ -610,7 +612,7 @@ def __init__(self, *args, **kwargs): self._results = {} def filename_prefix(self) -> str: - return self.entry_point.__name__ + return self.protocol_name def dataset_filename(self) -> str: return self.filename_prefix() + ".data.xslx" diff --git a/labop/execution/primitive_execution.py b/labop/execution/primitive_execution.py new file mode 100644 index 0000000..a86514a --- /dev/null +++ b/labop/execution/primitive_execution.py @@ -0,0 +1,291 @@ +import logging + +from ..dataset import Dataset +from ..sample_array import SampleArray +from ..sample_data import SampleData +from ..sample_mask import SampleMask +from ..sample_metadata import SampleMetadata +from ..utils.helpers import get_short_uuid + +PRIMITIVE_BASE_NAMESPACE = "https://bioprotocols.org/labop/primitives/" + + +l = logging.Logger(__file__) +l.setLevel(logging.INFO) + + +def transfer_out(self, source, target, plan, sample_format): + if sample_format == "xarray": + sourceResult, targetResult = self.transfer(source, target, plan, sample_format) + return sourceResult + else: + raise Exception(f"Cannot initialize contents of: {self.identity}") + + +def transfer_in(self, source, target, plan, sample_format): + if sample_format == "xarray": + sourceResult, targetResult = self.transfer(source, target, plan, sample_format) + return targetResult + else: + raise Exception(f"Cannot initialize contents of: {self.identity}") + + +def transfer(self, source, target, plan, sample_format): + if sample_format == "xarray": + source_contents = source.to_data_array() + target_contents = target.to_data_array() + transfer = plan.get_map() + if ( + source.name in transfer.source_array + and target.name in transfer.target_array + ): + source_result = source_contents.rename( + {"aliquot": "source_aliquot", "array": "source_array"} + ) + target_result = target_contents.rename( + {"aliquot": "target_aliquot", "array": "target_array"} + ) + source_concentration = source_result / source_result.sum(dim="contents") + + amount_transferred = source_concentration * transfer + + source_result = source_result - amount_transferred.sum( + dim=["target_aliquot", "target_array"] + ) + target_result = target_result + amount_transferred.sum( + dim=["source_aliquot", "source_array"] + ) + + return source_result, target_result + else: + return source_contents, target_contents + else: + raise Exception(f"Cannot initialize contents of: {self.identity}") + + +def empty_container_compute_output( + self, input_map, parameter, sample_format, call_behavior_execution_hash, engine +): + if ( + parameter.name == "samples" + and parameter.type == "http://bioprotocols.org/labop#SampleArray" + ): + # Make a SampleArray + spec = input_map["specification"] + sample_array = ( + input_map["sample_array"] if "sample_array" in input_map else None + ) + + if not sample_array: + sample_array = SampleArray.from_container_spec( + spec, sample_format=sample_format + ) + sample_array.name = spec.name + + # This attribute isn't formally specified in the ontology yet, but supports handling of different sample formats by BehaviorSpecialiations + # sample_array.format = sample_format + return sample_array + else: + return None + + +def empty_rack_compute_output( + self, input_map, parameter, sample_format, call_behavior_execution_hash, engine +): + if ( + parameter.name == "slots" + and parameter.type == "http://bioprotocols.org/labop#SampleArray" + ): + # Make a SampleArray + spec = input_map["specification"] + sample_array = SampleArray.from_container_spec( + spec, sample_format=sample_format + ) + return sample_array + else: + return None + + +def load_container_on_instrument_compute_output( + self, input_map, parameter, sample_format, call_behavior_execution_hash, engine +): + if ( + parameter.name == "samples" + and parameter.type == "http://bioprotocols.org/labop#SampleArray" + ): + # Make a SampleArray + spec = input_map["specification"] + sample_array = SampleArray.from_container_spec( + spec, sample_format=sample_format + ) + return sample_array + else: + return None + + +def plate_coordinates_compute_output( + self, input_map, parameter, sample_format, call_behavior_execution_hash, engine +): + if ( + parameter.name == "samples" + and parameter.type == "http://bioprotocols.org/labop#SampleCollection" + ): + source = input_map["source"] + coordinates = input_map["coordinates"] + # convert coordinates into a boolean sample mask array + # 1. read source contents into array + # 2. create parallel array for entries noted in coordinates + mask = SampleMask.from_coordinates( + source, coordinates, sample_format=sample_format + ) + + return mask + + +def measure_absorbance_compute_output( + self, input_map, parameter, sample_format, call_behavior_execution_hash, engine +): + if ( + parameter.name == "measurements" + and parameter.type == "http://bioprotocols.org/labop#Dataset" + ): + samples = input_map["samples"] + wl = input_map["wavelength"] + from labop.execution.lab_interface import LabInterface + + measurements = LabInterface.measure_absorbance(samples, wl.value, sample_format) + name = f"{self.display_id}.{parameter.name}.{get_short_uuid(call_behavior_execution_hash+hash(parameter))}" + sample_data = SampleData(name=name, from_samples=samples, values=measurements) + sample_metadata = SampleMetadata.for_primitive( + self, input_map, samples, sample_format=sample_format + ) + sample_dataset = Dataset(data=sample_data, metadata=[sample_metadata]) + return sample_dataset + + +def measure_fluorescence_compute_output( + self, input_map, parameter, sample_format, call_behavior_execution_hash, engine +): + if ( + parameter.name == "measurements" + and parameter.type == "http://bioprotocols.org/labop#Dataset" + ): + samples = input_map["samples"] + exwl = input_map["excitationWavelength"] + emwl = input_map["emissionWavelength"] + bandpass = input_map["emissionBandpassWidth"] + from labop.execution.lab_interface import LabInterface + + measurements = LabInterface.measure_fluorescence( + samples, + exwl.value, + emwl.value, + bandpass.value, + sample_format, + ) + name = f"{self.display_id}.{parameter.name}.{get_short_uuid(get_short_uuid(call_behavior_execution_hash+hash(parameter)))}" + sample_data = SampleData(name=name, from_samples=samples, values=measurements) + sample_metadata = SampleMetadata.for_primitive( + self, input_map, samples, sample_format=sample_format + ) + sample_dataset = Dataset(data=sample_data, metadata=[sample_metadata]) + return sample_dataset + + +def join_metadata_compute_output( + self, input_map, parameter, sample_format, call_behavior_execution_hash, engine +): + if ( + parameter.name == "enhanced_dataset" + and parameter.type == "http://bioprotocols.org/labop#Dataset" + ): + dataset = input_map["dataset"] + metadata = input_map["metadata"] + enhanced_dataset = Dataset(dataset=[dataset], linked_metadata=[metadata]) + return enhanced_dataset + + +def join_datasets_compute_output( + self, input_map, parameter, sample_format, call_behavior_execution_hash, engine +): + if ( + parameter.name == "joint_dataset" + and parameter.type == "http://bioprotocols.org/labop#Dataset" + ): + datasets = input_map["dataset"] + metadata = ( + input_map["metadata"] + if "metadata" in input_map and input_map["metadata"] + else [] + ) + joint_dataset = Dataset(dataset=datasets, linked_metadata=metadata) + return joint_dataset + + +def excel_metadata_compute_output( + self, input_map, parameter, sample_format, call_behavior_execution_hash, engine +): + if ( + parameter.name == "metadata" + and parameter.type == "http://bioprotocols.org/labop#SampleMetadata" + ): + filename = input_map["filename"] + for_samples = input_map["for_samples"] # check dataarray + metadata = SampleMetadata.from_excel( + filename, for_samples, sample_format=sample_format + ) + return metadata + + +def compute_metadata_compute_output( + self, input_map, parameter, sample_format, call_behavior_execution_hash, engine +): + if ( + parameter.name == "metadata" + and parameter.type == "http://bioprotocols.org/labop#SampleMetadata" + ): + for_samples = input_map["for_samples"] + metadata = SampleMetadata.from_sample_graph(for_samples, engine) + return metadata + + +def transfer_by_map_compute_output(self, inputs, parameter, sample_format, engine): + if ( + parameter.name == "sourceResult" + and parameter.type == "http://bioprotocols.org/labop#SampleCollection" + ): + source = input_map["source"] + target = input_map["destination"] + plan = input_map["plan"] + spec = source.container_type + contents = self.transfer_out(source, target, plan, sample_format) + name = f"{parameter.name}" + result = SampleArray(name=name, container_type=spec, contents=contents) + elif ( + parameter.name == "destinationResult" + and parameter.type == "http://bioprotocols.org/labop#SampleCollection" + ): + input_map = input_parameter_map(inputs) + source = input_map["source"] + target = input_map["destination"] + plan = input_map["plan"] + spec = source.container_type + contents = self.transfer_in(source, target, plan, sample_format) + name = f"{parameter.name}" + result = SampleArray(name=name, container_type=spec, contents=contents) + return result + + +primitive_to_output_function = { + "EmptyContainer": empty_container_compute_output, + "PlateCoordinates": plate_coordinates_compute_output, + "MeasureAbsorbance": measure_absorbance_compute_output, + "MeasureFluorescence": measure_fluorescence_compute_output, + "EmptyInstrument": empty_rack_compute_output, + "EmptyRack": empty_rack_compute_output, + "LoadContainerOnInstrument": load_container_on_instrument_compute_output, + "JoinMetadata": join_metadata_compute_output, + "JoinDatasets": join_datasets_compute_output, + "ExcelMetadata": excel_metadata_compute_output, + "ComputeMetadata": compute_metadata_compute_output, +} diff --git a/labop/primitive.py b/labop/primitive.py index 18625f9..48e5b4c 100644 --- a/labop/primitive.py +++ b/labop/primitive.py @@ -3,7 +3,6 @@ """ import logging -import types from typing import Dict, List import sbol3 @@ -11,13 +10,7 @@ from uml import PARAMETER_IN, PARAMETER_OUT, Behavior, inner_to_outer from . import inner -from .dataset import Dataset from .library import loaded_libraries -from .sample_array import SampleArray -from .sample_data import SampleData -from .sample_mask import SampleMask -from .sample_metadata import SampleMetadata -from .utils.helpers import get_short_uuid PRIMITIVE_BASE_NAMESPACE = "https://bioprotocols.org/labop/primitives/" @@ -130,7 +123,12 @@ def mark_optional(parameter): """ def compute_output( - self, inputs, parameter, sample_format, call_behavior_execution_hash + self, + inputs, + parameter, + sample_format, + call_behavior_execution_hash, + engine, ): """ Compute the value for parameter given the inputs. This default function will be overridden for specific primitives. @@ -177,293 +175,6 @@ def compute_output( ) return f"{parameter.name}" - def transfer_out(self, source, target, plan, sample_format): - if sample_format == "xarray": - sourceResult, targetResult = self.transfer( - source, target, plan, sample_format - ) - return sourceResult - else: - raise Exception(f"Cannot initialize contents of: {self.identity}") - - def transfer_in(self, source, target, plan, sample_format): - if sample_format == "xarray": - sourceResult, targetResult = self.transfer( - source, target, plan, sample_format - ) - return targetResult - else: - raise Exception(f"Cannot initialize contents of: {self.identity}") - - def transfer(self, source, target, plan, sample_format): - if sample_format == "xarray": - source_contents = source.to_data_array() - target_contents = target.to_data_array() - transfer = plan.get_map() - if ( - source.name in transfer.source_array - and target.name in transfer.target_array - ): - source_result = source_contents.rename( - {"aliquot": "source_aliquot", "array": "source_array"} - ) - target_result = target_contents.rename( - {"aliquot": "target_aliquot", "array": "target_array"} - ) - source_concentration = source_result / source_result.sum(dim="contents") - - amount_transferred = source_concentration * transfer - - source_result = source_result - amount_transferred.sum( - dim=["target_aliquot", "target_array"] - ) - target_result = target_result + amount_transferred.sum( - dim=["source_aliquot", "source_array"] - ) - - return source_result, target_result - else: - return source_contents, target_contents - else: - raise Exception(f"Cannot initialize contents of: {self.identity}") - - def empty_container_compute_output( - self, input_map, parameter, sample_format, call_behavior_execution_hash - ): - if ( - parameter.name == "samples" - and parameter.type == "http://bioprotocols.org/labop#SampleArray" - ): - # Make a SampleArray - spec = input_map["specification"] - sample_array = ( - input_map["sample_array"] if "sample_array" in input_map else None - ) - - if not sample_array: - sample_array = SampleArray.from_container_spec( - spec, sample_format=sample_format - ) - sample_array.name = spec.name - - # This attribute isn't formally specified in the ontology yet, but supports handling of different sample formats by BehaviorSpecialiations - # sample_array.format = sample_format - return sample_array - else: - return None - - def empty_rack_compute_output( - self, input_map, parameter, sample_format, call_behavior_execution_hash - ): - if ( - parameter.name == "slots" - and parameter.type == "http://bioprotocols.org/labop#SampleArray" - ): - # Make a SampleArray - spec = input_map["specification"] - sample_array = SampleArray.from_container_spec( - spec, sample_format=sample_format - ) - return sample_array - else: - return None - - def load_container_on_instrument_compute_output( - self, input_map, parameter, sample_format, call_behavior_execution_hash - ): - if ( - parameter.name == "samples" - and parameter.type == "http://bioprotocols.org/labop#SampleArray" - ): - # Make a SampleArray - spec = input_map["specification"] - sample_array = SampleArray.from_container_spec( - spec, sample_format=sample_format - ) - return sample_array - else: - return None - - def plate_coordinates_compute_output( - self, input_map, parameter, sample_format, call_behavior_execution_hash - ): - if ( - parameter.name == "samples" - and parameter.type == "http://bioprotocols.org/labop#SampleCollection" - ): - source = input_map["source"] - coordinates = input_map["coordinates"] - # convert coordinates into a boolean sample mask array - # 1. read source contents into array - # 2. create parallel array for entries noted in coordinates - mask = SampleMask.from_coordinates( - source, coordinates, sample_format=sample_format - ) - - return mask - - def measure_absorbance_compute_output( - self, input_map, parameter, sample_format, call_behavior_execution_hash - ): - if ( - parameter.name == "measurements" - and parameter.type == "http://bioprotocols.org/labop#Dataset" - ): - samples = input_map["samples"] - wl = input_map["wavelength"] - from labop.execution.lab_interface import LabInterface - - measurements = LabInterface.measure_absorbance( - samples, wl.value, sample_format - ) - name = f"{self.display_id}.{parameter.name}.{get_short_uuid(call_behavior_execution_hash+hash(parameter))}" - sample_data = SampleData( - name=name, from_samples=samples, values=measurements - ) - sample_metadata = SampleMetadata.for_primitive( - self, input_map, samples, sample_format=sample_format - ) - sample_dataset = Dataset(data=sample_data, metadata=[sample_metadata]) - return sample_dataset - - def measure_fluorescence_compute_output( - self, input_map, parameter, sample_format, call_behavior_execution_hash - ): - if ( - parameter.name == "measurements" - and parameter.type == "http://bioprotocols.org/labop#Dataset" - ): - samples = input_map["samples"] - exwl = input_map["excitationWavelength"] - emwl = input_map["emissionWavelength"] - bandpass = input_map["emissionBandpassWidth"] - from labop.execution.lab_interface import LabInterface - - measurements = LabInterface.measure_fluorescence( - samples, - exwl.value, - emwl.value, - bandpass.value, - sample_format, - ) - name = f"{self.display_id}.{parameter.name}.{get_short_uuid(get_short_uuid(call_behavior_execution_hash+hash(parameter)))}" - sample_data = SampleData( - name=name, from_samples=samples, values=measurements - ) - sample_metadata = SampleMetadata.for_primitive( - self, input_map, samples, sample_format=sample_format - ) - sample_dataset = Dataset(data=sample_data, metadata=[sample_metadata]) - return sample_dataset - - def join_metadata_compute_output( - self, input_map, parameter, sample_format, call_behavior_execution_hash - ): - if ( - parameter.name == "enhanced_dataset" - and parameter.type == "http://bioprotocols.org/labop#Dataset" - ): - dataset = input_map["dataset"] - metadata = input_map["metadata"] - enhanced_dataset = Dataset(dataset=[dataset], linked_metadata=[metadata]) - return enhanced_dataset - - def join_datasets_compute_output( - self, input_map, parameter, sample_format, call_behavior_execution_hash - ): - if ( - parameter.name == "joint_dataset" - and parameter.type == "http://bioprotocols.org/labop#Dataset" - ): - datasets = input_map["dataset"] - metadata = ( - input_map["metadata"] - if "metadata" in input_map and input_map["metadata"] - else [] - ) - joint_dataset = Dataset(dataset=datasets, linked_metadata=metadata) - return joint_dataset - - def excel_metadata_compute_output( - self, input_map, parameter, sample_format, call_behavior_execution_hash - ): - if ( - parameter.name == "metadata" - and parameter.type == "http://bioprotocols.org/labop#SampleMetadata" - ): - filename = input_map["filename"] - for_samples = input_map["for_samples"] # check dataarray - metadata = SampleMetadata.from_excel( - filename, for_samples, sample_format=sample_format - ) - return metadata - - def compute_metadata_compute_output( - self, input_map, parameter, sample_format, call_behavior_execution_hash - ): - if ( - parameter.name == "metadata" - and parameter.type == "http://bioprotocols.org/labop#SampleMetadata" - ): - for_samples = input_map["for_samples"] - metadata = SampleMetadata.from_sample_graph(for_samples, engine) - return metadata - - def transfer_by_map_compute_output( - self, - inputs, - parameter, - sample_format, - ): - if ( - parameter.name == "sourceResult" - and parameter.type == "http://bioprotocols.org/labop#SampleCollection" - ): - source = input_map["source"] - target = input_map["destination"] - plan = input_map["plan"] - spec = source.container_type - contents = self.transfer_out(source, target, plan, sample_format) - name = f"{parameter.name}" - result = SampleArray(name=name, container_type=spec, contents=contents) - elif ( - parameter.name == "destinationResult" - and parameter.type == "http://bioprotocols.org/labop#SampleCollection" - ): - input_map = input_parameter_map(inputs) - source = input_map["source"] - target = input_map["destination"] - plan = input_map["plan"] - spec = source.container_type - contents = self.transfer_in(source, target, plan, sample_format) - name = f"{parameter.name}" - result = SampleArray(name=name, container_type=spec, contents=contents) - return result - - primitive_to_output_function = { - "EmptyContainer": empty_container_compute_output, - "PlateCoordinates": plate_coordinates_compute_output, - "MeasureAbsorbance": measure_absorbance_compute_output, - "MeasureFluorescence": measure_fluorescence_compute_output, - "EmptyInstrument": empty_rack_compute_output, - "EmptyRack": empty_rack_compute_output, - "LoadContainerOnInstrument": load_container_on_instrument_compute_output, - "JoinMetadata": join_metadata_compute_output, - "JoinDatasets": join_datasets_compute_output, - "ExcelMetadata": excel_metadata_compute_output, - "ComputeMetadata": compute_metadata_compute_output, - } - - def initialize_primitive_compute_output(doc: sbol3.Document): - for k, v in Primitive.primitive_to_output_function.items(): - try: - p = Primitive.get_primitive(doc, k, copy_to_doc=False) - p.compute_output = types.MethodType(v, p) - except Exception as e: - l.warning( - f"Could not set compute_output() for primitive {k}, did you import the correct library?" - ) - def declare_primitive( document: sbol3.Document, library: str, diff --git a/labop/sample_metadata.py b/labop/sample_metadata.py index 93477d4..5c007f2 100644 --- a/labop/sample_metadata.py +++ b/labop/sample_metadata.py @@ -128,7 +128,7 @@ def to_dataarray(self): return deserialize_sample_format(self.descriptions, parent=self) def from_sample_graph(for_samples, engine, record_source=False): - metadata = labop.SampleMetadata(for_samples=for_samples) + metadata = SampleMetadata(for_samples=for_samples) if engine.sample_format == Strings.XARRAY: # Convert pd.DataFrame into xr.DataArray diff --git a/labop_convert/markdown/markdown_specialization.py b/labop_convert/markdown/markdown_specialization.py index 8ea3bab..cf428a8 100644 --- a/labop_convert/markdown/markdown_specialization.py +++ b/labop_convert/markdown/markdown_specialization.py @@ -12,10 +12,10 @@ import tyto import xarray as xr -from labop import SampleMask from labop.data import deserialize_sample_format from labop.sample_array import SampleArray from labop.sample_collection import SampleCollection +from labop.sample_mask import SampleMask from labop.strings import Strings from labop_convert.behavior_specialization import DefaultBehaviorSpecialization from uml import ( diff --git a/test/test_LUDOX_protocol.py b/test/test_LUDOX_protocol.py index 7aac739..6e50914 100644 --- a/test/test_LUDOX_protocol.py +++ b/test/test_LUDOX_protocol.py @@ -27,7 +27,7 @@ def create_protocol(self, doc, protocol: labop.Protocol) -> labop.Protocol: def test_create_protocol(self): harness = labop.execution.harness.ProtocolHarness( clean_output=True, - base_dir=os.path.dirname(__file__), + base_dir=os.path.join(os.path.dirname(__file__), "out"), entry_point=self.create_protocol, namespace="https://bbn.com/scratch/", protocol_name="iGEM_LUDOX_OD_calibration_2018", diff --git a/test/test_decision_nodes.py b/test/test_decision_nodes.py index cef6445..359b246 100644 --- a/test/test_decision_nodes.py +++ b/test/test_decision_nodes.py @@ -47,7 +47,7 @@ def create_protocol(self, doc: sbol3.Document, protocol: labop.Protocol): doc.add(pH_meter_calibrated) def pH_meter_calibrated_compute_output( - inputs, parameter, sample_format, record_hash + inputs, parameter, sample_format, record_hash, engine ): return True diff --git a/uml/action.py b/uml/action.py index 84314d5..e0b8380 100644 --- a/uml/action.py +++ b/uml/action.py @@ -2,7 +2,7 @@ The Action class defines the functions corresponding to the dynamically generated labop class Action """ -from typing import Callable, Dict, List +from typing import Dict, List import sbol3 @@ -161,7 +161,7 @@ def get_parameter(self, name: str = None, ordered=False): return next(iter(params)) else: raise ValueError( - f"Invalid parameter {name} provided for Primitive {behavior.display_id}" + f"Invalid parameter {name} provided for Primitive {self.get_behavior().display_id}" ) def is_well_formed(self) -> List[WellFormednessIssue]: @@ -270,28 +270,3 @@ def enabled( else: return control_tokens_present - - def get_value( - self, - edge: "ActivityEdge", - parameter_value_map: Dict[str, List[LiteralSpecification]], - node_outputs: Callable, - sample_format: str, - ): - from .control_flow import ControlFlow - from .object_flow import ObjectFlow - - value = "" - reference = False - - if isinstance(edge, ControlFlow): - value = "uml.ControlFlow" - elif isinstance(edge, ObjectFlow): - parameter = self.get_parameter(name=edge.get_source().name) - value = self.get_parameter_value( - parameter, parameter_value_map, node_outputs, sample_format - ) - reference = isinstance(value, sbol3.Identified) and value.identity != None - - value = [literal(value, reference=reference)] - return value diff --git a/uml/activity_node.py b/uml/activity_node.py index 3cb4306..5b9e858 100644 --- a/uml/activity_node.py +++ b/uml/activity_node.py @@ -186,28 +186,6 @@ def next_tokens_callback( edge_tokens[edge] = edge_value return edge_tokens - def get_value( - self, - edge: "ActivityEdge", - parameter_value_map: Dict[str, List[LiteralSpecification]], - node_outputs: Callable, - sample_format: str, - invocation_hash: int, - ): - from .control_flow import ControlFlow - from .object_flow import ObjectFlow - - value = "" - reference = False - - if isinstance(edge, ControlFlow): - value = "uml.ControlFlow" - elif isinstance(edge, ObjectFlow): - raise Exception("ActivityNode cannot get_value of outgoing ObjectFlow") - - value = [literal(value, reference=reference)] - return value - def is_well_formed(self) -> List[WellFormednessIssue]: return [] diff --git a/uml/activity_parameter_node.py b/uml/activity_parameter_node.py index 5fc5cd3..b5452fd 100644 --- a/uml/activity_parameter_node.py +++ b/uml/activity_parameter_node.py @@ -36,18 +36,6 @@ def is_output(self): def is_input(self): return self.get_parameter().is_input() - # def get_value(self) -> Dict[Parameter, LiteralSpecification]: - # values = [ - # i.value.get_value() - # for i in self.incoming_flows - # if isinstance(i.edge.lookup(), ObjectFlow) - # ] - # assert len(values) < 2, "ActivityParameterNode has too many incoming values" - # if len(values) == 1: - # return values[0] - # else: - # return None - def next_tokens_callback( self, node_inputs: Dict["ActivityEdge", LiteralSpecification], @@ -101,24 +89,3 @@ def next_tokens_callback( else: edge_tokens = [] return edge_tokens - - def get_value( - self, - edge: "ActivityEdge", - parameter_value_map: Dict[str, List[LiteralSpecification]], - node_outputs: Callable, - sample_format: str, - invocation_hash: int, - ): - value = "" - reference = False - - if isinstance(edge, ControlFlow): - value = "uml.ControlFlow" - elif isinstance(edge, ObjectFlow): - if self.is_output(): - value = parameter_value_map[self.name] - reference = True - - value = [literal(value, reference=reference)] - return value diff --git a/uml/call_behavior_action.py b/uml/call_behavior_action.py index bc8e979..e2d240d 100644 --- a/uml/call_behavior_action.py +++ b/uml/call_behavior_action.py @@ -3,7 +3,7 @@ dynamically generated labop class CallBehaviorAction """ -from typing import Callable, Dict, List +from typing import List import graphviz import sbol3 @@ -98,70 +98,6 @@ def dot_attrs(self, incoming_edges: List["ActivityEdge"]): shape = "none" return {"label": label, "shape": shape, "style": "rounded"} - def get_value( - self, - edge: "ActivityEdge", - parameter_value_map: Dict[str, List[LiteralSpecification]], - node_outputs: Callable, - sample_format: str, - invocation_hash: int, - ): - from .activity import Activity - - if isinstance(edge, ControlFlow): - return ActivityNode.get_value( - self, - edge, - parameter_value_map, - node_outputs, - sample_format, - invocation_hash, - ) - elif isinstance(edge, ObjectFlow): - if isinstance(self.get_behavior(), Activity): - # Activity Invocations do not send objects to their output pins - # The activity output parameters will send object to the output pins. - value = "uml.ControlFlow" - reference = False - else: - parameter = self.get_parameter(name=edge.get_target().name) - value = self.get_parameter_value( - parameter, - parameter_value_map, - node_outputs, - sample_format, - invocation_hash, - ) - - reference = ( - isinstance(value, sbol3.Identified) and value.identity != None - ) - if isinstance(value, list) or isinstance( - value, sbol3.ownedobject.OwnedObjectListProperty - ): - value = [literal(v, reference=reference) for v in value] - else: - value = [literal(value, reference=reference)] - return value - - def get_parameter_value( - self, - parameter: Parameter, - parameter_value_map: Dict[str, List[LiteralSpecification]], - node_outputs: Callable, - sample_format: str, - invocation_hash: int, - ): - if node_outputs: - value = node_outputs(self, parameter) - elif hasattr(self.get_behavior(), "compute_output"): - value = self.get_behavior().compute_output( - parameter_value_map, parameter, sample_format, invocation_hash - ) - else: - value = f"{parameter.name}" - return value - def auto_advance(self): """ Is the node executable without additional manual input? diff --git a/uml/fork_node.py b/uml/fork_node.py index 76e3ebc..a7a09fc 100644 --- a/uml/fork_node.py +++ b/uml/fork_node.py @@ -79,32 +79,3 @@ def next_tokens_callback( for edge in out_edges ] return edge_tokens - - def get_value( - self, - edge: "ActivityEdge", - parameter_value_map: Dict[str, List[LiteralSpecification]], - node_outputs: Callable, - sample_format: str, - invocation_hash: int, - ): - if isinstance(edge, ControlFlow): - return ActivityNode.get_value( - self, - edge, - parameter_value_map, - node_outputs, - sample_format, - invocation_hash, - ) - elif isinstance(edge, ObjectFlow): - value = next(iter(parameter_value_map.values())) - reference = True - - if isinstance(value, list) or isinstance( - value, sbol3.ownedobject.OwnedObjectListProperty - ): - value = [literal(v, reference=reference) for v in value] - else: - value = [literal(value, reference=reference)] - return value diff --git a/uml/input_pin.py b/uml/input_pin.py index 5ef0a39..ea7515b 100644 --- a/uml/input_pin.py +++ b/uml/input_pin.py @@ -2,7 +2,7 @@ The InputPin class defines the functions corresponding to the dynamically generated labop class InputPin """ -from typing import Callable, Dict, List, Union +from typing import Callable, Dict, List import sbol3 @@ -57,33 +57,6 @@ def next_tokens_callback( edge_tokens = [(None, source, pin_value) for pin_value in pin_values] return edge_tokens - def get_value( - self, - edge: "ActivityEdge", - node_inputs: Dict[str, Union[List[LiteralSpecification], LiteralSpecification]], - node_outputs: Callable, - sample_format: str, - invocation_hash: int, - ): - value = "" - reference = False - from .control_flow import ControlFlow - from .object_flow import ObjectFlow - - if isinstance(edge, ControlFlow): - value = "uml.ControlFlow" - elif isinstance(edge, ObjectFlow): - value = node_inputs[self.name] - reference = True - - if isinstance(value, list) or isinstance( - value, sbol3.ownedobject.OwnedObjectListProperty - ): - value = [literal(v, reference=reference) for v in value] - else: - value = [literal(value, reference=reference)] - return value - def is_well_formed(self) -> List[WellFormednessIssue]: """ An InputPin is well formed if: diff --git a/uml/output_pin.py b/uml/output_pin.py index ca8adc1..821a3a5 100644 --- a/uml/output_pin.py +++ b/uml/output_pin.py @@ -2,7 +2,6 @@ The OutputPin class defines the functions corresponding to the dynamically generated labop class OutputPin """ -from typing import Callable, Dict, List, Union from . import inner from .literal_specification import LiteralSpecification @@ -14,33 +13,3 @@ class OutputPin(inner.OutputPin, Pin): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._where_defined = self.get_where_defined() - - def get_value( - self, - edge: "ActivityEdge", - node_inputs: Dict[str, Union[List[LiteralSpecification], LiteralSpecification]], - node_outputs: Callable, - sample_format: str, - invocation_hash: int, - ): - from .control_flow import ControlFlow - from .object_flow import ObjectFlow - - value = "" - reference = False - - if isinstance(edge, ControlFlow): - value = ["uml.ControlFlow"] - elif isinstance(edge, ObjectFlow): - call_node = self.get_parent() - # parameter = call_node.get_parameter( - # edge.get_source().name - # ).property_value - value = node_inputs[self.name] - reference = True - - if isinstance(value, list): - value = [literal(v, reference=reference) for v in value] - else: - value = [literal(value, reference=reference)] - return value