From d2063fefdfac2d842611ad4fc8c8e5f73ac6e311 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matt=C3=A9o=20Baussart?= Date: Wed, 11 Dec 2024 10:08:23 +0100 Subject: [PATCH] feat: add WorkflowTopology class and workflow_to_workflow_topology operator (#1902) --- src/ansys/dpf/core/__init__.py | 1 + src/ansys/dpf/core/common.py | 46 ++++ src/ansys/dpf/core/custom_container_base.py | 54 +++++ src/ansys/dpf/core/dpf_operator.py | 16 +- src/ansys/dpf/core/helpers/utils.py | 40 ++++ src/ansys/dpf/core/outputs.py | 25 +- src/ansys/dpf/core/workflow.py | 30 +++ .../dpf/core/workflow_topology/__init__.py | 26 +++ .../core/workflow_topology/data_connection.py | 200 ++++++++++++++++ .../dpf/core/workflow_topology/exposed_pin.py | 201 ++++++++++++++++ .../workflow_topology/operator_connection.py | 218 ++++++++++++++++++ .../workflow_topology/workflow_topology.py | 185 +++++++++++++++ tests/conftest.py | 2 + tests/test_any.py | 2 +- tests/test_operator.py | 58 +++++ tests/test_workflow.py | 21 ++ tests/test_workflow_topology.py | 198 ++++++++++++++++ 17 files changed, 1311 insertions(+), 12 deletions(-) create mode 100644 src/ansys/dpf/core/custom_container_base.py create mode 100644 src/ansys/dpf/core/workflow_topology/__init__.py create mode 100644 src/ansys/dpf/core/workflow_topology/data_connection.py create mode 100644 src/ansys/dpf/core/workflow_topology/exposed_pin.py create mode 100644 src/ansys/dpf/core/workflow_topology/operator_connection.py create mode 100644 src/ansys/dpf/core/workflow_topology/workflow_topology.py create mode 100644 tests/test_workflow_topology.py diff --git a/src/ansys/dpf/core/__init__.py b/src/ansys/dpf/core/__init__.py index 1c885d1d70..89711fcf5f 100644 --- a/src/ansys/dpf/core/__init__.py +++ b/src/ansys/dpf/core/__init__.py @@ -118,6 +118,7 @@ CustomTypeFieldsCollection:type = _CollectionFactory(CustomTypeField) GenericDataContainersCollection:type = _CollectionFactory(GenericDataContainer) StringFieldsCollection:type = _CollectionFactory(StringField) +OperatorsCollection: type = _CollectionFactory(Operator) AnyCollection:type = _Collection # for matplotlib diff --git a/src/ansys/dpf/core/common.py b/src/ansys/dpf/core/common.py index 1c13185b64..7e07167a20 100644 --- a/src/ansys/dpf/core/common.py +++ b/src/ansys/dpf/core/common.py @@ -32,6 +32,7 @@ import re import sys from enum import Enum +from typing import Dict from ansys.dpf.core.misc import module_exists from ansys.dpf.gate.common import locations, ProgressBarBase # noqa: F401 @@ -430,6 +431,51 @@ def type_to_special_dpf_constructors(): return _type_to_special_dpf_constructors +_derived_class_name_to_type = None + + +def derived_class_name_to_type() -> Dict[str, type]: + """ + Returns a mapping of derived class names to their corresponding Python classes. + + Returns + ------- + dict[str, type] + A dictionary mapping derived class names (str) to their corresponding + Python class objects. + """ + global _derived_class_name_to_type + if _derived_class_name_to_type is None: + from ansys.dpf.core.workflow_topology import WorkflowTopology + + _derived_class_name_to_type = {"WorkflowTopology": WorkflowTopology} + return _derived_class_name_to_type + + +def record_derived_class(class_name: str, py_class: type, overwrite: bool = False): + """ + Records a new derived class in the mapping of class names to their corresponding Python classes. + + This function updates the global dictionary that maps derived class names (str) to their corresponding + Python class objects (type). If the provided class name already exists in the dictionary, it will either + overwrite the existing mapping or leave it unchanged based on the `overwrite` flag. + + Parameters + ---------- + class_name : str + The name of the derived class to be recorded. + py_class : type + The Python class type corresponding to the derived class. + overwrite : bool, optional + A flag indicating whether to overwrite an existing entry for the `class_name`. + If `True`, the entry will be overwritten. If `False` (default), the entry will + not be overwritten if it already exists. + """ + recorded_classes = derived_class_name_to_type() + if overwrite or class_name not in recorded_classes: + recorded_classes[class_name] = py_class + + def create_dpf_instance(type, internal_obj, server): spe_constructors = type_to_special_dpf_constructors() if type in spe_constructors: diff --git a/src/ansys/dpf/core/custom_container_base.py b/src/ansys/dpf/core/custom_container_base.py new file mode 100644 index 0000000000..44df013144 --- /dev/null +++ b/src/ansys/dpf/core/custom_container_base.py @@ -0,0 +1,54 @@ +# Copyright (C) 2020 - 2024 ANSYS, Inc. and/or its affiliates. +# SPDX-License-Identifier: MIT +# +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +""" +CustomContainerBase +=================== +This module contains the `CustomContainerBase` class, which serves as a base +for creating wrappers around `GenericDataContainer` objects. + +These wrappers provide an interface for accessing and managing data in +generic containers, enabling more intuitive usage and the addition of custom +behaviors tailored to specific use cases. +""" + +from ansys.dpf.core.generic_data_container import GenericDataContainer + + +class CustomContainerBase: + """ + Base class for custom container wrappers. + + This class provides a common interface for managing an underlying + `GenericDataContainer` object. + """ + + def __init__(self, container: GenericDataContainer) -> None: + """ + Initialize the base container with a `GenericDataContainer`. + + Parameters + ---------- + container : GenericDataContainer + The underlying data container to be wrapped by this class. + """ + self._container = container diff --git a/src/ansys/dpf/core/dpf_operator.py b/src/ansys/dpf/core/dpf_operator.py index b0e2add66b..0780091893 100644 --- a/src/ansys/dpf/core/dpf_operator.py +++ b/src/ansys/dpf/core/dpf_operator.py @@ -384,6 +384,7 @@ def _type_to_output_method(self): mesh_info, collection_base, any, + custom_container_base, ) out = [ @@ -481,6 +482,15 @@ def _type_to_output_method(self): self._api.operator_getoutput_as_any, lambda obj, type: any.Any(server=self._server, any_dpf=obj).cast(type), ), + ( + custom_container_base.CustomContainerBase, + self._api.operator_getoutput_generic_data_container, + lambda obj, type: type( + container=generic_data_container.GenericDataContainer( + generic_data_container=obj, server=self._server + ) + ), + ), ] if hasattr(self._api, "operator_getoutput_generic_data_container"): out.append( @@ -726,8 +736,10 @@ def default_config(name, server=None): def __del__(self): try: - if self._internal_obj is not None: - self._deleter_func[0](self._deleter_func[1](self)) + if hasattr(self, "_deleter_func"): + obj = self._deleter_func[1](self) + if obj is not None: + self._deleter_func[0](obj) except: warnings.warn(traceback.format_exc()) diff --git a/src/ansys/dpf/core/helpers/utils.py b/src/ansys/dpf/core/helpers/utils.py index 395f4b4ea5..b43986b884 100644 --- a/src/ansys/dpf/core/helpers/utils.py +++ b/src/ansys/dpf/core/helpers/utils.py @@ -22,6 +22,7 @@ import inspect import sys +from typing import Any, Optional def _sort_supported_kwargs(bound_method, **kwargs): @@ -48,3 +49,42 @@ def _sort_supported_kwargs(bound_method, **kwargs): warnings.warn(txt) # Return the accepted arguments return kwargs_in + + +def indent(text: Any, subsequent_indent: str = "", initial_indent: Optional[str] = None) -> str: + """Indents each line of a given text. + + Parameters + ---------- + text : Any + The input text to be indented. If it is not already a string, it will be converted to one. + subsequent_indent : str, optional + The string to prefix all lines of the text after the first line. Default is an empty string. + initial_indent : Optional[str], optional + The string to prefix the first line of the text. If not provided, `subsequent_indent` will be used. + + Returns + ------- + str + The indented text with specified prefixes applied to each line. + + Examples + -------- + >>> text = "Hello\\nWorld" + >>> print(indent(text, subsequent_indent=" ", initial_indent="--> ")) + --> Hello + World + """ + if initial_indent is None: + initial_indent = subsequent_indent + + if not isinstance(text, str): + text = str(text) + + lines = text.rstrip().splitlines() + indented_lines = [ + f"{initial_indent if index == 0 else subsequent_indent}{line}" + for (index, line) in enumerate(lines) + ] + + return "\n".join(indented_lines) diff --git a/src/ansys/dpf/core/outputs.py b/src/ansys/dpf/core/outputs.py index e7dc9c3da6..4109a669bc 100644 --- a/src/ansys/dpf/core/outputs.py +++ b/src/ansys/dpf/core/outputs.py @@ -81,17 +81,24 @@ def get_data(self): elif type_output == "int32": type_output = types.int + output = self._operator.get_output(self._pin, type_output) + type_output_derive_class = self._spec.name_derived_class + if type_output_derive_class == "": + return output + + from ansys.dpf.core.common import derived_class_name_to_type + + derived_type = derived_class_name_to_type().get(type_output_derive_class) + if derived_type is not None: + return derived_type(output) - if type_output_derive_class != "": - out_type = [ - type_tuple - for type_tuple in self._operator._type_to_output_method - if type_output_derive_class in type_tuple - ] - return out_type[0][0](self._operator.get_output(self._pin, type_output)) - else: - return self._operator.get_output(self._pin, type_output) + derived_types = [ + type_tuple + for type_tuple in self._operator._type_to_output_method + if type_output_derive_class in type_tuple + ] + return derived_types[0][0](output) def __call__(self): return self.get_data() diff --git a/src/ansys/dpf/core/workflow.py b/src/ansys/dpf/core/workflow.py index d582f5c176..634bbfe581 100644 --- a/src/ansys/dpf/core/workflow.py +++ b/src/ansys/dpf/core/workflow.py @@ -333,6 +333,7 @@ def _type_to_output_method(self): collection_base, streams_container, ) + from ansys.dpf.core.custom_container_base import CustomContainerBase out = [ (streams_container.StreamsContainer, self._api.work_flow_getoutput_streams), @@ -421,6 +422,15 @@ def _type_to_output_method(self): self._api.work_flow_getoutput_as_any, lambda obj, type: any.Any(server=self._server, any_dpf=obj).cast(type), ), + ( + CustomContainerBase, + self._api.work_flow_getoutput_generic_data_container, + lambda obj, type: type( + container=generic_data_container.GenericDataContainer( + generic_data_container=obj, server=self._server + ) + ), + ), ] if hasattr(self._api, "work_flow_connect_generic_data_container"): out.append( @@ -953,6 +963,26 @@ def to_graphviz(self, path: Union[os.PathLike, str]): """Saves the workflow to a GraphViz file.""" return self._api.work_flow_export_graphviz(self, str(path)) + @version_requires("10.0") + def get_topology(self): + """Get the topology of the workflow. + + Returns + ------- + workflow_topology : workflow_topology.WorkflowTopology + + Notes + ----- + Available from 10.0 server version. + """ + workflow_to_workflow_topology_op = dpf_operator.Operator( + "workflow_to_workflow_topology", server=self._server + ) + workflow_to_workflow_topology_op.inputs.workflow.connect(self) + workflow_topology = workflow_to_workflow_topology_op.outputs.workflow_topology() + + return workflow_topology + def __del__(self): try: if hasattr(self, "_internal_obj"): diff --git a/src/ansys/dpf/core/workflow_topology/__init__.py b/src/ansys/dpf/core/workflow_topology/__init__.py new file mode 100644 index 0000000000..1b670cd721 --- /dev/null +++ b/src/ansys/dpf/core/workflow_topology/__init__.py @@ -0,0 +1,26 @@ +# Copyright (C) 2020 - 2024 ANSYS, Inc. and/or its affiliates. +# SPDX-License-Identifier: MIT +# +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from .workflow_topology import WorkflowTopology +from .operator_connection import OperatorConnection +from .data_connection import DataConnection +from .exposed_pin import ExposedPin diff --git a/src/ansys/dpf/core/workflow_topology/data_connection.py b/src/ansys/dpf/core/workflow_topology/data_connection.py new file mode 100644 index 0000000000..5ccf6e9246 --- /dev/null +++ b/src/ansys/dpf/core/workflow_topology/data_connection.py @@ -0,0 +1,200 @@ +# Copyright (C) 2020 - 2024 ANSYS, Inc. and/or its affiliates. +# SPDX-License-Identifier: MIT +# +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +""" +DataConnection +============== +This module contains the `DataConnection` and `DataConnectionsCollection` +classes, which represent individual connections between data and operator, +and a collection of such connections within a workflow, respectively. +""" + +from typing import Any, Iterator, Optional +from ansys.dpf.core import GenericDataContainersCollection +from ansys.dpf.core.custom_container_base import CustomContainerBase +from ansys.dpf.core.dpf_operator import Operator +from ansys.dpf.core.generic_data_container import GenericDataContainer + + +class DataConnection(CustomContainerBase): + """ + Represents a connection between a data and an operator in a workflow. + + This class provides access to the source data and target operator, as well as its pin ID. + """ + + def __init__(self, container: GenericDataContainer) -> None: + """ + Initialize an DataConnection object. + + Parameters + ---------- + container : GenericDataContainer + The underlying data container that holds the connection's information. + """ + super().__init__(container) + + self._source_data: Optional[Any] = None + self._target_operator: Optional[Operator] = None + self._target_pin_id: Optional[int] = None + + @property + def source_data(self) -> Any: + """ + Retrieve the source data of the connection. + + Returns + ------- + Any + The data serving as the source of this connection. + """ + if self._source_data is None: + self._source_data = self._container.get_property("source_data") + + return self._source_data + + @property + def target_operator(self) -> Operator: + """ + Retrieve the target operator of the connection. + + Returns + ------- + Operator + The operator serving as the target of this connection. + """ + if self._target_operator is None: + self._target_operator = self._container.get_property("target_operator", Operator) + + return self._target_operator + + @property + def target_pin_id(self) -> int: + """ + Retrieve the pin ID of the target operator. + + Returns + ------- + int + The pin ID of the target operator. + """ + if self._target_pin_id is None: + self._target_pin_id = self._container.get_property("target_pin_id", int) + + return self._target_pin_id + + def __str__(self) -> str: + """ + Return a string representation of the data connection. + + This includes the source data and target operator, with its pin ID. + + Returns + ------- + str + String representation of the data connection. + """ + from ansys.dpf.core.helpers.utils import indent + + indents = " " + return ( + "DataConnection with properties:\n" + " - source_data:\n" + f"{indent(self.source_data, indents)}\n" + " - target_operator:\n" + f"{indent(self.target_operator.name, indents)}\n" + " - target_pin_id:\n" + f"{indent(self.target_pin_id, indents)}" + ) + + +class DataConnectionsCollection: + """ + Represents a collection of data connections in a workflow. + + This class provides iterable access to all data connections, allowing retrieval + of individual connections or iteration through the entire collection. + """ + + def __init__(self, collection: GenericDataContainersCollection) -> None: + """ + Initialize an DataConnectionsCollection object. + + Parameters + ---------- + collection : GenericDataContainersCollection + The underlying collection of data connections. + """ + self._collection = collection + + def __len__(self) -> int: + """ + Return the number of data connections in the collection. + + Returns + ------- + int + The number of data connections. + """ + return len(self._collection) + + def __getitem__(self, index: int) -> DataConnection: + """ + Retrieve a data connection by its index. + + Parameters + ---------- + index : int + The index of the data connection to retrieve. + + Returns + ------- + DataConnection + The data connection at the specified index. + """ + return DataConnection(self._collection[index]) + + def __iter__(self) -> Iterator[DataConnection]: + """ + Iterate over the data connections in the collection. + + Yields + ------ + DataConnection + The next data connection in the collection. + """ + for i in range(len(self)): + yield self[i] + + def __str__(self) -> str: + """ + Return a string representation of the data connections collection. + + Returns + ------- + str + String representation of the collection. + """ + from ansys.dpf.core.helpers.utils import indent + + indents = (" ", " - ") + return "\n".join([indent(data_connection, *indents) for data_connection in self]) diff --git a/src/ansys/dpf/core/workflow_topology/exposed_pin.py b/src/ansys/dpf/core/workflow_topology/exposed_pin.py new file mode 100644 index 0000000000..761730a657 --- /dev/null +++ b/src/ansys/dpf/core/workflow_topology/exposed_pin.py @@ -0,0 +1,201 @@ +# Copyright (C) 2020 - 2024 ANSYS, Inc. and/or its affiliates. +# SPDX-License-Identifier: MIT +# +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +""" +ExposedPin +========== +This module contains the `ExposedPin` and `ExposedPinsCollection` classes, +which represent individual exposed pins and a collection of exposed pins in a workflow, +respectively. These classes enable easy access to the pins that serve as input/output points +for the workflow. +""" + +from typing import Iterator, Optional +from ansys.dpf.core import GenericDataContainersCollection +from ansys.dpf.core.custom_container_base import CustomContainerBase +from ansys.dpf.core.dpf_operator import Operator +from ansys.dpf.core.generic_data_container import GenericDataContainer + + +class ExposedPin(CustomContainerBase): + """ + Represents an exposed input or output pin in a workflow. + + This class provides access to the name and the associated operator, as well as its pin ID. + """ + + def __init__(self, container: GenericDataContainer) -> None: + """ + Initialize an ExposedPin object. + + Parameters + ---------- + container : GenericDataContainer + The underlying data container that holds the exposed pin's information. + """ + super().__init__(container) + + self._name: Optional[str] = None + self._operator: Optional[Operator] = None + self._pin_id: Optional[int] = None + + @property + def name(self) -> str: + """ + Retrieve the name of the exposed pin. + + Returns + ------- + str + The name of the exposed pin. + """ + if self._name is None: + self._name = self._container.get_property("name", str) + + return self._name + + @property + def operator(self) -> Operator: + """ + Retrieve the operator associated with the exposed pin. + + Returns + ------- + Operator + The operator associated with this exposed pin. + """ + if self._operator is None: + self._operator = self._container.get_property("operator", Operator) + + return self._operator + + @property + def pin_id(self) -> int: + """ + Retrieve the pin ID of the operator. + + Returns + ------- + int + The pin ID of the operator. + """ + if self._pin_id is None: + self._pin_id = self._container.get_property("pin_id", int) + + return self._pin_id + + def __str__(self) -> str: + """ + Return a string representation of the exposed pin. + + This includes the name and associated operator, with its pin ID. + + Returns + ------- + str + String representation of the exposed pin. + """ + from ansys.dpf.core.helpers.utils import indent + + indents = " " + return ( + "ExposedPin with properties:\n" + " - name:\n" + f"{indent(self.name, indents)}\n" + " - operator:\n" + f"{indent(self.operator.name, indents)}\n" + " - pin_id:\n" + f"{indent(self.pin_id, indents)}" + ) + + +class ExposedPinsCollection: + """ + Represents a collection of exposed pins in a workflow. + + This class provides iterable access to all exposed pins, allowing retrieval + of individual exposed pins or iteration through the entire collection. + """ + + def __init__(self, collection: GenericDataContainersCollection) -> None: + """ + Initialize an ExposedPinsCollection object. + + Parameters + ---------- + collection : GenericDataContainersCollection + The underlying collection of exposed pins. + """ + self._collection = collection + + def __len__(self) -> int: + """ + Return the number of exposed pins in the collection. + + Returns + ------- + int + The number of exposed pins. + """ + return len(self._collection) + + def __getitem__(self, index: int) -> ExposedPin: + """ + Retrieve an exposed pin by its index. + + Parameters + ---------- + index : int + The index of the exposed pin to retrieve. + + Returns + ------- + ExposedPin + The exposed pin at the specified index. + """ + return ExposedPin(self._collection[index]) + + def __iter__(self) -> Iterator[ExposedPin]: + """ + Iterate over the exposed pins in the collection. + + Yields + ------ + ExposedPin + The next exposed pin in the collection. + """ + for i in range(len(self)): + yield self[i] + + def __str__(self) -> str: + """ + Return a string representation of the exposed pins collection. + + Returns + ------- + str + String representation of the collection. + """ + from ansys.dpf.core.helpers.utils import indent + + indents = (" ", " - ") + return "\n".join([indent(exposed_pin, *indents) for exposed_pin in self]) diff --git a/src/ansys/dpf/core/workflow_topology/operator_connection.py b/src/ansys/dpf/core/workflow_topology/operator_connection.py new file mode 100644 index 0000000000..09891ff8da --- /dev/null +++ b/src/ansys/dpf/core/workflow_topology/operator_connection.py @@ -0,0 +1,218 @@ +# Copyright (C) 2020 - 2024 ANSYS, Inc. and/or its affiliates. +# SPDX-License-Identifier: MIT +# +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +""" +OperatorConnection +================== +This module contains the `OperatorConnection` and `OperatorConnectionsCollection` +classes, which represent individual connections between operators and a +collection of such connections within a workflow, respectively. +""" + +from typing import Iterator, Optional +from ansys.dpf.core import GenericDataContainersCollection +from ansys.dpf.core.custom_container_base import CustomContainerBase +from ansys.dpf.core.dpf_operator import Operator +from ansys.dpf.core.generic_data_container import GenericDataContainer + + +class OperatorConnection(CustomContainerBase): + """ + Represents a connection between two operators in a workflow. + + This class provides access to the source and target operators, as well as their respective pin IDs. + """ + + def __init__(self, container: GenericDataContainer) -> None: + """ + Initialize an OperatorConnection object. + + Parameters + ---------- + container : GenericDataContainer + The underlying data container that holds the connection's information. + """ + super().__init__(container) + + self._source_operator: Optional[Operator] = None + self._source_pin_id: Optional[int] = None + self._target_operator: Optional[Operator] = None + self._target_pin_id: Optional[int] = None + + @property + def source_operator(self) -> Operator: + """ + Retrieve the source operator of the connection. + + Returns + ------- + Operator + The operator serving as the source of this connection. + """ + if self._source_operator is None: + self._source_operator = self._container.get_property("source_operator", Operator) + + return self._source_operator + + @property + def source_pin_id(self) -> int: + """ + Retrieve the pin ID of the source operator. + + Returns + ------- + int + The pin ID of the source operator. + """ + if self._source_pin_id is None: + self._source_pin_id = self._container.get_property("source_pin_id", int) + + return self._source_pin_id + + @property + def target_operator(self) -> Operator: + """ + Retrieve the target operator of the connection. + + Returns + ------- + Operator + The operator serving as the target of this connection. + """ + if self._target_operator is None: + self._target_operator = self._container.get_property("target_operator", Operator) + + return self._target_operator + + @property + def target_pin_id(self) -> int: + """ + Retrieve the pin ID of the target operator. + + Returns + ------- + int + The pin ID of the target operator. + """ + if self._target_pin_id is None: + self._target_pin_id = self._container.get_property("target_pin_id", int) + + return self._target_pin_id + + def __str__(self) -> str: + """ + Return a string representation of the operator connection. + + This includes the source and target operators and their respective pin IDs. + + Returns + ------- + str + String representation of the operator connection. + """ + from ansys.dpf.core.helpers.utils import indent + + indents = " " + return ( + "OperatorConnection with properties:\n" + " - source_operator:\n" + f"{indent(self.source_operator.name, indents)}\n" + " - source_pin_id:\n" + f"{indent(self.source_pin_id, indents)}\n" + " - target_operator:\n" + f"{indent(self.target_operator.name, indents)}\n" + " - target_pin_id:\n" + f"{indent(self.target_pin_id, indents)}" + ) + + +class OperatorConnectionsCollection: + """ + Represents a collection of operator connections in a workflow. + + This class provides iterable access to all operator connections, allowing retrieval + of individual connections or iteration through the entire collection. + """ + + def __init__(self, collection: GenericDataContainersCollection) -> None: + """ + Initialize an OperatorConnectionsCollection object. + + Parameters + ---------- + collection : GenericDataContainersCollection + The underlying collection of operator connections. + """ + self._collection = collection + + def __len__(self) -> int: + """ + Return the number of operator connections in the collection. + + Returns + ------- + int + The number of operator connections. + """ + return len(self._collection) + + def __getitem__(self, index: int) -> OperatorConnection: + """ + Retrieve an operator connection by its index. + + Parameters + ---------- + index : int + The index of the operator connection to retrieve. + + Returns + ------- + OperatorConnection + The operator connection at the specified index. + """ + return OperatorConnection(self._collection[index]) + + def __iter__(self) -> Iterator[OperatorConnection]: + """ + Iterate over the operator connections in the collection. + + Yields + ------ + OperatorConnection + The next operator connection in the collection. + """ + for i in range(len(self)): + yield self[i] + + def __str__(self) -> str: + """ + Return a string representation of the operator connections collection. + + Returns + ------- + str + String representation of the collection. + """ + from ansys.dpf.core.helpers.utils import indent + + indents = (" ", " - ") + return "\n".join([indent(operator_connection, *indents) for operator_connection in self]) diff --git a/src/ansys/dpf/core/workflow_topology/workflow_topology.py b/src/ansys/dpf/core/workflow_topology/workflow_topology.py new file mode 100644 index 0000000000..8728afb31a --- /dev/null +++ b/src/ansys/dpf/core/workflow_topology/workflow_topology.py @@ -0,0 +1,185 @@ +# Copyright (C) 2020 - 2024 ANSYS, Inc. and/or its affiliates. +# SPDX-License-Identifier: MIT +# +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +""" +WorkflowTopology +================ +This module contains the `WorkflowTopology` class, which represents +the structure and relationships within a workflow, including its operators, +connections, and exposed input/output pins. +""" + +from typing import Optional +from ansys.dpf.core import OperatorsCollection +from ansys.dpf.core.custom_container_base import CustomContainerBase +from ansys.dpf.core.generic_data_container import GenericDataContainer +from ansys.dpf.core.workflow_topology.data_connection import DataConnectionsCollection +from ansys.dpf.core.workflow_topology.exposed_pin import ExposedPinsCollection +from ansys.dpf.core.workflow_topology.operator_connection import OperatorConnectionsCollection + + +class WorkflowTopology(CustomContainerBase): + """ + Represents the topology of a workflow, including its operators, connections, and exposed input/output pins. + """ + + def __init__(self, container: GenericDataContainer) -> None: + """ + Initialize a WorkflowTopology object. + + Parameters + ---------- + container : GenericDataContainer + The underlying data container that holds the workflow topology information. + """ + super().__init__(container) + + self._operators: Optional[OperatorsCollection] = None + self._operator_connections: Optional[OperatorConnectionsCollection] = None + self._data_connections: Optional[DataConnectionsCollection] = None + self._exposed_inputs: Optional[ExposedPinsCollection] = None + self._exposed_outputs: Optional[ExposedPinsCollection] = None + + @property + def operators(self) -> OperatorsCollection: + """ + Retrieve the operators in the workflow. + + Returns + ------- + OperatorsCollection + A collection of all the operators in the workflow. + """ + if self._operators is None: + self._operators = self._container.get_property("operators", OperatorsCollection) + + return self._operators + + @property + def operator_connections(self) -> OperatorConnectionsCollection: + """ + Retrieve the operator connections in the workflow. + + Returns + ------- + OperatorConnectionsCollection + A collection of all the operator connections in the workflow. + """ + from ansys.dpf.core import GenericDataContainersCollection + + if self._operator_connections is None: + self._operator_connections = OperatorConnectionsCollection( + self._container.get_property( + "operator_connections", GenericDataContainersCollection + ) + ) + + return self._operator_connections + + @property + def data_connections(self) -> DataConnectionsCollection: + """ + Retrieve the data connections in the workflow. + + Returns + ------- + OperatorConnectionsCollection + A collection of all the data connections in the workflow. + """ + from ansys.dpf.core import GenericDataContainersCollection + + if self._data_connections is None: + self._data_connections = DataConnectionsCollection( + self._container.get_property("data_connections", GenericDataContainersCollection) + ) + + return self._data_connections + + @property + def exposed_inputs(self) -> ExposedPinsCollection: + """ + Retrieve the exposed inputs in the workflow. + + Returns + ------- + ExposedPinsCollection + A collection of all the exposed inputs in the workflow. + """ + from ansys.dpf.core import GenericDataContainersCollection + + if self._exposed_inputs is None: + self._exposed_inputs = ExposedPinsCollection( + self._container.get_property("exposed_inputs", GenericDataContainersCollection) + ) + + return self._exposed_inputs + + @property + def exposed_outputs(self) -> ExposedPinsCollection: + """ + Retrieve the exposed outputs in the workflow. + + Returns + ------- + ExposedPinsCollection + A collection of all the exposed outputs in the workflow. + """ + from ansys.dpf.core import GenericDataContainersCollection + + if self._exposed_outputs is None: + self._exposed_outputs = ExposedPinsCollection( + self._container.get_property("exposed_outputs", GenericDataContainersCollection) + ) + + return self._exposed_outputs + + def __str__(self) -> str: + """ + Return a string representation of the workflow topology. + + The string provides details about the workflow's operators, connections, and exposed pins. + + Returns + ------- + str + String representation of the workflow topology. + """ + from ansys.dpf.core.helpers.utils import indent + + def indent_operators(operators): + indents = (" ", " - ") + return "\n".join([indent(operator.name, *indents) for operator in operators]) + + indents = " " + return ( + "WorkflowTopology with properties:\n" + f" - operators (len: {len(self.operators)}):\n" + f"{indent_operators(self.operators)}\n" + f" - operator_connections (len: {len(self.operator_connections)}):\n" + f"{indent(self.operator_connections, indents)}\n" + f" - data_connections (len: {len(self.data_connections)}):\n" + f"{indent(self.data_connections, indents)}\n" + f" - exposed_inputs (len: {len(self.exposed_inputs)}):\n" + f"{indent(self.exposed_inputs, indents)}\n" + f" - exposed_outputs (len: {len(self.exposed_outputs)}):\n" + f"{indent(self.exposed_outputs, indents)}" + ) diff --git a/tests/conftest.py b/tests/conftest.py index 73c3f59ce9..c2e0f6ab95 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -391,6 +391,8 @@ def decorator(func): if version == "5.0" else not SERVERS_VERSION_GREATER_THAN_OR_EQUAL_TO_6_0 if version == "6.0" + else not SERVERS_VERSION_GREATER_THAN_OR_EQUAL_TO_10_0 + if version == "10.0" else True, reason=f"Requires server version greater than or equal to {version}", raises=core.errors.DpfVersionNotSupported, diff --git a/tests/test_any.py b/tests/test_any.py index d91c8042b7..468cccd301 100644 --- a/tests/test_any.py +++ b/tests/test_any.py @@ -136,7 +136,7 @@ def test_cast_workflow_any(server_type): @pytest.mark.skipif( not conftest.SERVERS_VERSION_GREATER_THAN_OR_EQUAL_TO_10_0, - reason="any does not support operator below 8.0", + reason="any does not support operator below 10.0", ) def test_cast_operator_any(server_type): entity = dpf.Operator(server=server_type, name="U") diff --git a/tests/test_operator.py b/tests/test_operator.py index 8cfadd185b..b4c2211cf6 100644 --- a/tests/test_operator.py +++ b/tests/test_operator.py @@ -33,8 +33,11 @@ from ansys import dpf from ansys.dpf.core import errors from ansys.dpf.core import operators as ops +from ansys.dpf.core.common import derived_class_name_to_type, record_derived_class +from ansys.dpf.core.custom_container_base import CustomContainerBase from ansys.dpf.core.misc import get_ansys_path from ansys.dpf.core.operator_specification import Specification +from ansys.dpf.core.workflow_topology import WorkflowTopology import conftest from conftest import ( SERVERS_VERSION_GREATER_THAN_OR_EQUAL_TO_3_0, @@ -1424,3 +1427,58 @@ def test_operator_input_output_streams(server_in_process, simple_bar): time_provider.connect(pin=3, inpt=streams) times = time_provider.outputs.time_freq_support() assert times + + +@pytest.mark.skipif( + not conftest.SERVERS_VERSION_GREATER_THAN_OR_EQUAL_TO_10_0, + reason="Operator `workflow_to_workflow_topology` does not exist below 10.0", +) +def test_operator_outputs_derived_class(server_type): + workflow = dpf.core.Workflow(server=server_type) + + workflow_to_workflow_topology_op = dpf.core.Operator( + "workflow_to_workflow_topology", server=server_type + ) + workflow_to_workflow_topology_op.inputs.workflow.connect(workflow) + + workflow_topology = workflow_to_workflow_topology_op.outputs.workflow_topology() + assert workflow_topology + + +@pytest.mark.skipif( + not conftest.SERVERS_VERSION_GREATER_THAN_OR_EQUAL_TO_10_0, + reason="Operator `workflow_to_workflow_topology` does not exist below 10.0", +) +def test_operator_get_output_derived_class(server_type): + workflow = dpf.core.Workflow(server=server_type) + + workflow_to_workflow_topology_op = dpf.core.Operator( + "workflow_to_workflow_topology", server=server_type + ) + workflow_to_workflow_topology_op.inputs.workflow.connect(workflow) + + workflow_topology = workflow_to_workflow_topology_op.get_output(0, WorkflowTopology) + assert workflow_topology + + +def test_record_derived_type(): + class TestContainer(CustomContainerBase): + pass + + class TestContainer2(CustomContainerBase): + pass + + class_name = "TestContainer" + + derived_classes = derived_class_name_to_type() + assert class_name not in derived_classes + + record_derived_class(class_name, TestContainer) + assert class_name in derived_classes + assert derived_classes[class_name] is TestContainer + + record_derived_class(class_name, TestContainer2) + assert derived_classes[class_name] is TestContainer + + record_derived_class(class_name, TestContainer2, overwrite=True) + assert derived_classes[class_name] is TestContainer2 diff --git a/tests/test_workflow.py b/tests/test_workflow.py index 5576cbe555..40cff92ecf 100644 --- a/tests/test_workflow.py +++ b/tests/test_workflow.py @@ -27,6 +27,7 @@ import platform import ansys.dpf.core.operators as op +from ansys.dpf.core.workflow_topology import WorkflowTopology import conftest from ansys import dpf from ansys.dpf.core import misc @@ -1030,6 +1031,26 @@ def test_workflow_input_output_streams(server_in_process, simple_bar): assert times +@pytest.mark.skipif( + not conftest.SERVERS_VERSION_GREATER_THAN_OR_EQUAL_TO_10_0, + reason="Operator `workflow_to_workflow_topology` does not exist below 10.0", +) +def test_workflow_get_output_derived_class(server_type): + workflow = dpf.core.Workflow(server=server_type) + + workflow_to_workflow_topology_op = dpf.core.Operator( + "workflow_to_workflow_topology", server=server_type + ) + dpf_workflow_wrapper = dpf.core.Workflow(server=server_type) + dpf_workflow_wrapper.add_operator(workflow_to_workflow_topology_op) + dpf_workflow_wrapper.set_input_name("input", workflow_to_workflow_topology_op, 0) + dpf_workflow_wrapper.set_output_name("output", workflow_to_workflow_topology_op, 0) + dpf_workflow_wrapper.connect("input", workflow) + + workflow_topology = dpf_workflow_wrapper.get_output("output", WorkflowTopology) + assert workflow_topology + + def main(): test_connect_field_workflow() velocity_acceleration = conftest.resolve_test_file("velocity_acceleration.rst", "rst_operators") diff --git a/tests/test_workflow_topology.py b/tests/test_workflow_topology.py new file mode 100644 index 0000000000..beea6c01d4 --- /dev/null +++ b/tests/test_workflow_topology.py @@ -0,0 +1,198 @@ +# Copyright (C) 2020 - 2024 ANSYS, Inc. and/or its affiliates. +# SPDX-License-Identifier: MIT +# +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import pytest + +from ansys import dpf +import ansys.dpf.core.operators as op +import conftest +from conftest import raises_for_servers_version_under + + +def workflow_forward(server_type) -> dpf.core.Workflow: + """ + ┌─────────┐ ┌────────────┐ ┌────────────┐ ┌──────────┐ + │"Input 0"├─────►│forward_op_1├─────►│forward_op_1├─────►│"Output 0"│ + └─────────┘ └────────────┘ └────────────┘ └──────────┘ + ┌───────┐ ┌────────────┐ ┌──────────┐ + │"hello"├──────────►│forward_op_3├─────►│"Output 1"│ + └───────┘ └────────────┘ └──────────┘ + """ + + forward_op_1 = op.utility.forward(server=server_type) + forward_op_2 = op.utility.forward(server=server_type) + forward_op_3 = op.utility.forward(server=server_type) + + forward_op_2.inputs.connect(forward_op_1.outputs) + forward_op_3.inputs.connect("hello") + + workflow = dpf.core.Workflow(server=server_type) + + workflow.add_operators([forward_op_1, forward_op_2, forward_op_3]) + + workflow.set_input_name("Input 0", forward_op_1.inputs.any) + workflow.set_output_name("Output 0", forward_op_2.outputs.any) + workflow.set_output_name("Output 1", forward_op_3.outputs.any) + + return workflow + + +def workflow_forward_5(server_type) -> dpf.core.Workflow: + """ + ┌─────────┐ ┌──────────┐ + │"Input 0"├──┐ ┌──►│"Output 0"│ + └─────────┘ │ │ └──────────┘ + ┌─────────┐ │ │ ┌──────────┐ + │"Input 1"├──┤ ├──►│"Output 1"│ + └─────────┘ │ │ └──────────┘ + ┌─────────┐ │ ┌──────────┐ │ ┌──────────┐ + │"Input 2"├──┼──►│forward_op├──┼──►│"Output 2"│ + └─────────┘ │ └──────────┘ │ └──────────┘ + ┌─────────┐ │ │ ┌──────────┐ + │"Input 3"├──┤ ├──►│"Output 3"│ + └─────────┘ │ │ └──────────┘ + ┌─────────┐ │ │ ┌──────────┐ + │"Input 4"├──┘ └──►│"Output 4"│ + └─────────┘ └──────────┘ + """ + + forward_op = op.utility.forward(server=server_type) + + workflow = dpf.core.Workflow(server=server_type) + + workflow.add_operators([forward_op]) + + for i in range(5): + workflow.set_input_name(f"Input {i}", forward_op, i) + workflow.set_output_name(f"Output {i}", forward_op, i) + + return workflow + + +def workflow_disp_min_max(server_type) -> dpf.core.Workflow: + """ + ┌──────────────┐ ┌───────┐ ┌─────────────┐ ┌─────┐ + │"data_sources"├─────►│disp_op├─────►│min_max_fc_op├──┬──►│"min"│ + └──────────────┘ └───────┘ └─────────────┘ │ └─────┘ + │ ┌─────┐ + └──►│"max"│ + └─────┘ + """ + + disp_op = op.result.displacement(server=server_type) + min_max_fc_op = op.min_max.min_max_fc(disp_op, server=server_type) + + workflow = dpf.core.Workflow(server=server_type) + + workflow.add_operators([disp_op, min_max_fc_op]) + + workflow.set_input_name("data_sources", disp_op.inputs.data_sources) + workflow.set_output_name("min", min_max_fc_op.outputs.field_min) + workflow.set_output_name("max", min_max_fc_op.outputs.field_max) + + return workflow + + +workflows = { + "workflow_forward": workflow_forward, + "workflow_forward_5": workflow_forward_5, + "workflow_disp_min_max": workflow_disp_min_max, +} +workflow_topologies = { + "workflow_forward": { + "operators": 3, + "operator_connections": 1, + "data_connections": 1, + "exposed_inputs": 1, + "exposed_outputs": 2, + }, + "workflow_forward_5": { + "operators": 1, + "operator_connections": 0, + "data_connections": 0, + "exposed_inputs": 5, + "exposed_outputs": 5, + }, + "workflow_disp_min_max": { + "operators": 2, + "operator_connections": 1, + "data_connections": 0, + "exposed_inputs": 1, + "exposed_outputs": 2, + }, +} + + +@pytest.fixture( + params=list(workflows.values()), + ids=list(workflows.keys()), +) +def workflow(server_type, request) -> dpf.core.Workflow: + wf = request.param(server_type) + wf.name = list(workflows.keys())[request.param_index] + return wf + + +@pytest.fixture() +def expected_workflow_topology(workflow): + return workflow_topologies[workflow.name] + + +@pytest.mark.skipif( + not conftest.SERVERS_VERSION_GREATER_THAN_OR_EQUAL_TO_10_0, + reason="Operator `workflow_to_workflow_topology` does not exist below 10.0", +) +def test_instantiate_workflow_to_workflow_topology_op(server_type): + workflow_to_workflow_topology_op = dpf.core.Operator( + "workflow_to_workflow_topology", server=server_type + ) + + assert workflow_to_workflow_topology_op + + +@raises_for_servers_version_under("10.0") +def test_workflow_get_topology(workflow): + workflow_topology = workflow.get_topology() + + assert workflow_topology + + +@raises_for_servers_version_under("10.0") +def test_workflow_topology_sizes(workflow, expected_workflow_topology): + workflow_topology = workflow.get_topology() + + assert len(workflow_topology.operators) == expected_workflow_topology["operators"] + assert ( + len(workflow_topology.operator_connections) + == expected_workflow_topology["operator_connections"] + ) + assert len(workflow_topology.data_connections) == expected_workflow_topology["data_connections"] + assert len(workflow_topology.exposed_inputs) == expected_workflow_topology["exposed_inputs"] + assert len(workflow_topology.exposed_outputs) == expected_workflow_topology["exposed_outputs"] + + +@raises_for_servers_version_under("10.0") +def test_workflow_topology_str(workflow): + workflow_topology = workflow.get_topology() + + # We only check that it does not raise + assert str(workflow_topology)