diff --git a/src/ansys/dpf/post/harmonic_mechanical_simulation.py b/src/ansys/dpf/post/harmonic_mechanical_simulation.py index 8ce06d095..ba19e181e 100644 --- a/src/ansys/dpf/post/harmonic_mechanical_simulation.py +++ b/src/ansys/dpf/post/harmonic_mechanical_simulation.py @@ -7,6 +7,8 @@ from typing import List, Optional, Tuple, Union import warnings +from ansys.dpf.core import shell_layers + from ansys.dpf import core as dpf from ansys.dpf.post import locations from ansys.dpf.post.dataframe import DataFrame @@ -56,6 +58,7 @@ def _get_result_workflow( phase_angle_cyclic: Union[float, None] = None, averaging_config: AveragingConfig = AveragingConfig(), rescoping: Optional[_Rescoping] = None, + shell_layer: Optional[shell_layers] = None, ) -> (dpf.Workflow, Union[str, list[str], None], str): """Generate (without evaluating) the Workflow to extract results.""" result_workflow_inputs = _create_result_workflow_inputs( @@ -70,6 +73,7 @@ def _get_result_workflow( sweeping_phase=sweeping_phase, averaging_config=averaging_config, rescoping=rescoping, + shell_layer=shell_layer, ) result_workflows = _create_result_workflows( server=self._model._server, @@ -89,6 +93,7 @@ def _get_result_workflow( location=location, force_elemental_nodal=result_workflows.force_elemental_nodal, averaging_config=averaging_config, + shell_layer=shell_layer, ) output_wf = _connect_averaging_eqv_and_principal_workflows(result_workflows) @@ -135,6 +140,7 @@ def _get_result( external_layer: Union[bool, List[int]] = False, skin: Union[bool, List[int]] = False, averaging_config: AveragingConfig = AveragingConfig(), + shell_layer: Optional[shell_layers] = None, ) -> DataFrame: """Extract results from the simulation. @@ -211,6 +217,8 @@ def _get_result( Per default averaging happens across all bodies. The averaging config can define that averaging happens per body and defines the properties that are used to define a body. + shell_layer: + Shell layer to extract results for. Returns ------- diff --git a/src/ansys/dpf/post/modal_mechanical_simulation.py b/src/ansys/dpf/post/modal_mechanical_simulation.py index e180cabb2..1504fac15 100644 --- a/src/ansys/dpf/post/modal_mechanical_simulation.py +++ b/src/ansys/dpf/post/modal_mechanical_simulation.py @@ -6,6 +6,8 @@ """ from typing import List, Optional, Union +from ansys.dpf.core import shell_layers + from ansys.dpf import core as dpf from ansys.dpf.post import locations from ansys.dpf.post.dataframe import DataFrame @@ -45,6 +47,7 @@ def _get_result_workflow( phase_angle_cyclic: Union[float, None] = None, averaging_config: AveragingConfig = AveragingConfig(), rescoping: Optional[_Rescoping] = None, + shell_layer: Optional[shell_layers] = None, ) -> (dpf.Workflow, Union[str, list[str], None], str): """Generate (without evaluating) the Workflow to extract results.""" result_workflow_inputs = _create_result_workflow_inputs( @@ -57,6 +60,7 @@ def _get_result_workflow( create_operator_callable=self._model.operator, averaging_config=averaging_config, rescoping=rescoping, + shell_layer=shell_layer, ) result_workflows = _create_result_workflows( server=self._model._server, @@ -116,6 +120,7 @@ def _get_result( external_layer: Union[bool, List[int]] = False, skin: Union[bool, List[int]] = False, averaging_config: AveragingConfig = AveragingConfig(), + shell_layer: Optional[int] = None, ) -> DataFrame: """Extract results from the simulation. @@ -185,6 +190,8 @@ def _get_result( Per default averaging happens across all bodies. The averaging config can define that averaging happens per body and defines the properties that are used to define a body. + shell_layer: + Shell layer to extract results for. Returns ------- diff --git a/src/ansys/dpf/post/result_workflows/_build_workflow.py b/src/ansys/dpf/post/result_workflows/_build_workflow.py index 44316d3a6..98b8420f2 100644 --- a/src/ansys/dpf/post/result_workflows/_build_workflow.py +++ b/src/ansys/dpf/post/result_workflows/_build_workflow.py @@ -1,7 +1,7 @@ import dataclasses from typing import Callable, List, Optional, Union -from ansys.dpf.core import Operator, Workflow +from ansys.dpf.core import Operator, Workflow, shell_layers from ansys.dpf.core.available_result import _result_properties from ansys.dpf.gate.common import locations @@ -92,6 +92,7 @@ class _CreateWorkflowInputs: components_to_extract: list[int] should_extract_components: bool averaging_config: AveragingConfig + shell_layer: Optional[shell_layers] sweeping_phase_workflow_inputs: Optional[_SweepingPhaseWorkflowInputs] = None rescoping_workflow_inputs: Optional[_Rescoping] = None @@ -138,15 +139,23 @@ def _create_result_workflows( The resulting workflows are stored in a ResultWorkflows object. """ + force_elemental_nodal = ( + create_workflow_inputs.averaging_workflow_inputs.force_elemental_nodal + ) + + is_nodal = ( + create_workflow_inputs.averaging_workflow_inputs.location == locations.nodal + and not force_elemental_nodal + ) + initial_result_wf = _create_initial_result_workflow( name=create_workflow_inputs.base_name, server=server, + is_nodal=is_nodal, + shell_layer=create_workflow_inputs.shell_layer, create_operator_callable=create_operator_callable, ) - force_elemental_nodal = ( - create_workflow_inputs.averaging_workflow_inputs.force_elemental_nodal - ) average_wf = _create_averaging_workflow( location=create_workflow_inputs.averaging_workflow_inputs.location, has_skin=create_workflow_inputs.has_skin, @@ -242,6 +251,7 @@ def _create_result_workflow_inputs( selection: Selection, create_operator_callable: Callable[[str], Operator], averaging_config: AveragingConfig, + shell_layer: Optional[shell_layers], rescoping: Optional[_Rescoping] = None, amplitude: bool = False, sweeping_phase: Union[float, None] = 0.0, @@ -293,4 +303,5 @@ def _create_result_workflow_inputs( sweeping_phase_workflow_inputs=sweeping_phase_workflow_inputs, averaging_config=averaging_config, rescoping_workflow_inputs=rescoping, + shell_layer=shell_layer, ) diff --git a/src/ansys/dpf/post/result_workflows/_connect_workflow_inputs.py b/src/ansys/dpf/post/result_workflows/_connect_workflow_inputs.py index fff590a59..f79dce277 100644 --- a/src/ansys/dpf/post/result_workflows/_connect_workflow_inputs.py +++ b/src/ansys/dpf/post/result_workflows/_connect_workflow_inputs.py @@ -1,6 +1,12 @@ from typing import Any, Optional -from ansys.dpf.core import MeshedRegion, Scoping, ScopingsContainer, Workflow +from ansys.dpf.core import ( + MeshedRegion, + Scoping, + ScopingsContainer, + Workflow, + shell_layers, +) from ansys.dpf.post.result_workflows._build_workflow import ResultWorkflows from ansys.dpf.post.result_workflows._sub_workflows import ( @@ -86,6 +92,7 @@ def _connect_workflow_inputs( streams_provider: Any, data_sources: Any, averaging_config: AveragingConfig, + shell_layer: Optional[shell_layers] = None, ): """Connects the inputs of the initial result workflow. @@ -147,6 +154,13 @@ def _connect_workflow_inputs( initial_result_workflow.connect(_WfNames.mesh, mesh) + if shell_layer is not None: + if _WfNames.shell_layer not in initial_result_workflow.input_names: + raise RuntimeError( + "The shell_layer input is not supported by this workflow." + ) + initial_result_workflow.connect(_WfNames.shell_layer, shell_layer.value) + if rescoping_workflow: rescoping_workflow.connect(_WfNames.mesh, mesh) if _WfNames.data_sources in rescoping_workflow.input_names: diff --git a/src/ansys/dpf/post/result_workflows/_sub_workflows.py b/src/ansys/dpf/post/result_workflows/_sub_workflows.py index d8b935728..bfc9ceecc 100644 --- a/src/ansys/dpf/post/result_workflows/_sub_workflows.py +++ b/src/ansys/dpf/post/result_workflows/_sub_workflows.py @@ -1,6 +1,12 @@ -from typing import Union - -from ansys.dpf.core import MeshedRegion, StreamsContainer, Workflow, operators +from typing import Optional, Union + +from ansys.dpf.core import ( + MeshedRegion, + StreamsContainer, + Workflow, + operators, + shell_layers, +) from ansys.dpf.gate.common import locations from ansys.dpf.post.misc import _connect_any @@ -165,16 +171,68 @@ def _create_norm_workflow( def _create_initial_result_workflow( - name: str, server, create_operator_callable: _CreateOperatorCallable + name: str, + server, + shell_layer: Optional[shell_layers], + is_nodal: bool, + create_operator_callable: _CreateOperatorCallable, ): initial_result_workflow = Workflow(server=server) initial_result_op = create_operator_callable(name=name) + initial_result_workflow.set_input_name(_WfNames.mesh, initial_result_op, 7) initial_result_workflow.set_input_name(_WfNames.location, initial_result_op, 9) initial_result_workflow.add_operator(initial_result_op) - initial_result_workflow.set_output_name(_WfNames.output_data, initial_result_op, 0) + + forward_shell_layer_op = operators.utility.forward() + initial_result_workflow.add_operator(forward_shell_layer_op) + initial_result_workflow.set_input_name(_WfNames.shell_layer, forward_shell_layer_op) + + # The next section is only needed, because the shell_layer selection does not + # work for elemental and elemental nodal results. + # If elemental results are requested with a chosen shell layer, + # the shell layer is not selected and the results are split into solids + # and shells. Here, we add an additional shell layer selection and merge_shell_solid + # operator to manually merge the results. + # Note that we have to skip this step if the location is nodal, because + # the resulting location is wrong when the shell layer is selected again manually, after + # it was already selected by the initial result operator. + if shell_layer is not None and not is_nodal: + merge_shell_solid_fields = create_operator_callable( + name="merge::solid_shell_fields" + ) + initial_result_workflow.add_operator(merge_shell_solid_fields) + shell_layer_op = operators.utility.change_shell_layers() + shell_layer_op.inputs.merge(True) + initial_result_workflow.add_operator(shell_layer_op) + + initial_result_workflow.set_output_name( + _WfNames.output_data, merge_shell_solid_fields, 0 + ) + shell_layer_op.inputs.fields_container( + initial_result_op.outputs.fields_container + ) + _connect_any( + shell_layer_op.inputs.e_shell_layer, forward_shell_layer_op.outputs.any + ) + + merge_shell_solid_fields.inputs.fields_container( + shell_layer_op.outputs.fields_container_as_fields_container + ) + + # End section for elemental results with shell layer selection + else: + initial_result_workflow.set_output_name( + _WfNames.output_data, initial_result_op, 0 + ) + + if hasattr(initial_result_op.inputs, "shell_layer"): + _connect_any( + initial_result_op.inputs.shell_layer, forward_shell_layer_op.outputs.any + ) + initial_result_workflow.set_input_name( "time_scoping", initial_result_op.inputs.time_scoping ) diff --git a/src/ansys/dpf/post/selection.py b/src/ansys/dpf/post/selection.py index 321431e8e..48a4f7b86 100644 --- a/src/ansys/dpf/post/selection.py +++ b/src/ansys/dpf/post/selection.py @@ -52,6 +52,7 @@ class _WfNames: result = "result" input_data = "input_data" output_data = "output_data" + shell_layer = "shell_layer" def _is_model_cyclic(is_cyclic: str): diff --git a/src/ansys/dpf/post/static_mechanical_simulation.py b/src/ansys/dpf/post/static_mechanical_simulation.py index 0ceb8b979..7f697865c 100644 --- a/src/ansys/dpf/post/static_mechanical_simulation.py +++ b/src/ansys/dpf/post/static_mechanical_simulation.py @@ -6,6 +6,8 @@ """ from typing import List, Optional, Tuple, Union +from ansys.dpf.core import shell_layers + from ansys.dpf import core from ansys.dpf.post import locations from ansys.dpf.post.dataframe import DataFrame @@ -41,6 +43,7 @@ def _get_result_workflow( phase_angle_cyclic: Union[float, None] = None, averaging_config: AveragingConfig = AveragingConfig(), rescoping: Optional[_Rescoping] = None, + shell_layer: Optional[shell_layers] = None, ) -> (core.Workflow, Union[str, list[str], None], str): """Generate (without evaluating) the Workflow to extract results.""" result_workflow_inputs = _create_result_workflow_inputs( @@ -53,6 +56,7 @@ def _get_result_workflow( create_operator_callable=self._model.operator, averaging_config=averaging_config, rescoping=rescoping, + shell_layer=shell_layer, ) result_workflows = _create_result_workflows( server=self._model._server, @@ -72,6 +76,7 @@ def _get_result_workflow( location=location, force_elemental_nodal=result_workflows.force_elemental_nodal, averaging_config=averaging_config, + shell_layer=shell_layer, ) output_wf = _connect_averaging_eqv_and_principal_workflows(result_workflows) @@ -115,6 +120,7 @@ def _get_result( external_layer: Union[bool, List[int]] = False, skin: Union[bool, List[int]] = False, averaging_config: AveragingConfig = AveragingConfig(), + shell_layer: Optional[int] = None, ) -> DataFrame: """Extract results from the simulation. @@ -186,6 +192,8 @@ def _get_result( Per default averaging happens across all bodies. The averaging config can define that averaging happens per body and defines the properties that are used to define a body. + shell_layer: + Shell layer to extract results for. Returns ------- @@ -234,6 +242,7 @@ def _get_result( phase_angle_cyclic=phase_angle_cyclic, averaging_config=averaging_config, rescoping=rescoping, + shell_layer=shell_layer, ) # Evaluate the workflow diff --git a/src/ansys/dpf/post/transient_mechanical_simulation.py b/src/ansys/dpf/post/transient_mechanical_simulation.py index 38d685173..f2ff22d02 100644 --- a/src/ansys/dpf/post/transient_mechanical_simulation.py +++ b/src/ansys/dpf/post/transient_mechanical_simulation.py @@ -6,6 +6,8 @@ """ from typing import List, Optional, Tuple, Union +from ansys.dpf.core import shell_layers + from ansys.dpf import core as dpf from ansys.dpf.post import locations from ansys.dpf.post.dataframe import DataFrame @@ -43,6 +45,7 @@ def _get_result_workflow( selection: Union[Selection, None] = None, averaging_config: AveragingConfig = AveragingConfig(), rescoping: Optional[_Rescoping] = None, + shell_layer: Optional[shell_layers] = None, ) -> (dpf.Workflow, Union[str, list[str], None], str): """Generate (without evaluating) the Workflow to extract results.""" result_workflow_inputs = _create_result_workflow_inputs( @@ -55,6 +58,7 @@ def _get_result_workflow( create_operator_callable=self._model.operator, averaging_config=averaging_config, rescoping=rescoping, + shell_layer=shell_layer, ) result_workflows = _create_result_workflows( server=self._model._server, @@ -115,6 +119,7 @@ def _get_result( external_layer: Union[bool, List[int]] = False, skin: Union[bool, List[int]] = False, averaging_config: AveragingConfig = AveragingConfig(), + shell_layer: Optional[shell_layers] = None, ) -> DataFrame: """Extract results from the simulation. @@ -180,6 +185,8 @@ def _get_result( Per default averaging happens across all bodies. The averaging config can define that averaging happens per body and defines the properties that are used to define a body. + shell_layer: + Shell layer to extract results for. Returns ------- diff --git a/tests/conftest.py b/tests/conftest.py index 199da1aba..b84ac3a0b 100755 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -128,6 +128,34 @@ def static_rst(): return examples.static_rst +@pytest.fixture() +def mixed_shell_solid_model(): + """Resolve the path of the "mixed_shell_solid" result file.""" + return _download_file( + "result_files/extract_shell_layer", "mixed_shell_solid.rst", True, None, False + ) + + +@pytest.fixture() +def mixed_shell_solid_with_contact_model(): + """Resolve the path of the "mixed_shell_solid_with_contact" result file.""" + return _download_file( + "result_files/extract_shell_layer", + "mixed_shell_solid_with_contact.rst", + True, + None, + False, + ) + + +@pytest.fixture() +def two_cubes_contact_model(): + """Resolve the path of the "two_cubes_contact" result file.""" + return _download_file( + "result_files/extract_shell_layer", "two_cubes_contact.rst", True, None, False + ) + + @pytest.fixture() def complex_model(): """Resolve the path of the "msup/plate1.rst" result file.""" @@ -169,7 +197,7 @@ def average_per_body_complex_multi_body(): @dataclasses.dataclass -class ReferenceCsvFiles: +class ReferenceCsvFilesNodal: # reference result with all bodies combined # The node ids of nodes at body interfaces are duplicated combined: pathlib.Path @@ -177,32 +205,64 @@ class ReferenceCsvFiles: per_id: dict[str, pathlib.Path] -def get_per_body_ref_files( - root_path: str, n_bodies: int -) -> dict[str, ReferenceCsvFiles]: +@dataclasses.dataclass +class ReferenceCsvResult: + name: str + has_bodies: bool = True + + +def get_ref_files( + root_path: str, n_bodies: int, results: list[ReferenceCsvResult] +) -> dict[str, ReferenceCsvFilesNodal]: + # Returns a dict of ReferenceCsvFiles for each result_name ref_files = {} - for result in ["stress", "elastic_strain"]: + for result in results: per_mat_id_dict = {} - for mat in range(1, n_bodies + 1): - per_mat_id_dict[str(mat)] = _download_file( - root_path, f"{result}_mat_{mat}.txt", True, None, False - ) + if result.has_bodies: + for mat in range(1, n_bodies + 1): + per_mat_id_dict[str(mat)] = _download_file( + root_path, f"{result.name}_mat_{mat}.txt", True, None, False + ) combined = _download_file( - root_path, f"{result}_combined.txt", True, None, False + root_path, f"{result.name}_combined.txt", True, None, False + ) + ref_files[result.name] = ReferenceCsvFilesNodal( + combined=combined, per_id=per_mat_id_dict ) - ref_files[result] = ReferenceCsvFiles(combined=combined, per_id=per_mat_id_dict) return ref_files @pytest.fixture() def average_per_body_complex_multi_body_ref(): - return get_per_body_ref_files("result_files/average_per_body/complex_multi_body", 7) + return get_ref_files( + "result_files/average_per_body/complex_multi_body", + 7, + results=[ReferenceCsvResult("stress"), ReferenceCsvResult("elastic_strain")], + ) + + +@pytest.fixture() +def shell_layer_multi_body_ref(): + return get_ref_files( + "result_files/extract_shell_layer", + 2, + results=[ + ReferenceCsvResult("stress_top_nodal"), + ReferenceCsvResult("stress_bot_nodal"), + ReferenceCsvResult("stress_top_elemental", False), + ReferenceCsvResult("stress_bot_elemental", False), + ], + ) @pytest.fixture() def average_per_body_two_cubes_ref(): - return get_per_body_ref_files("result_files/average_per_body/two_cubes", 2) + return get_ref_files( + "result_files/average_per_body/two_cubes", + 2, + results=[ReferenceCsvResult("stress"), ReferenceCsvResult("elastic_strain")], + ) @pytest.fixture() diff --git a/tests/test_simulation.py b/tests/test_simulation.py index b60a80737..5d16c6524 100644 --- a/tests/test_simulation.py +++ b/tests/test_simulation.py @@ -13,6 +13,7 @@ element_types, natures, operators, + shell_layers, ) from ansys.dpf.gate.common import locations import numpy as np @@ -38,7 +39,7 @@ SERVERS_VERSION_GREATER_THAN_OR_EQUAL_TO_8_0, SERVERS_VERSION_GREATER_THAN_OR_EQUAL_TO_9_0, SERVERS_VERSION_GREATER_THAN_OR_EQUAL_TO_9_1, - ReferenceCsvFiles, + ReferenceCsvFilesNodal, ) @@ -451,6 +452,30 @@ def static_simulation(static_rst): ) +@fixture +def mixed_shell_solid_simulation(mixed_shell_solid_model): + return post.load_simulation( + data_sources=mixed_shell_solid_model, + simulation_type=AvailableSimulationTypes.static_mechanical, + ) + + +@fixture +def mixed_shell_solid_with_contact_simulation(mixed_shell_solid_with_contact_model): + return post.load_simulation( + data_sources=mixed_shell_solid_with_contact_model, + simulation_type=AvailableSimulationTypes.static_mechanical, + ) + + +@fixture +def two_cubes_contact_simulation(two_cubes_contact_model): + return post.load_simulation( + data_sources=two_cubes_contact_model, + simulation_type=AvailableSimulationTypes.static_mechanical, + ) + + @fixture def transient_simulation(plate_msup): return post.load_simulation( @@ -1172,6 +1197,349 @@ def test_skin_layer6(self, static_simulation: post.StaticMechanicalSimulation): ) +def compute_number_of_expected_nodes(on_skin: bool, average_per_body: bool): + n_nodes_per_side = 4 + if on_skin: + # Take all the surfaces and remove nodes at the edges + # and corners that are counted 2 or 3 times. + # Remove the edge that touches both bodies. It is counted + # 3 times + nodes_all_surfaces = n_nodes_per_side**2 * 7 + duplicate_nodes_on_edges = 11 * (n_nodes_per_side - 2) + triplicated_nodes_at_corners = 7 + expected_number_of_nodes = ( + nodes_all_surfaces + - duplicate_nodes_on_edges + - 2 * triplicated_nodes_at_corners + - 2 * n_nodes_per_side + ) + else: + n_solid_nodes = n_nodes_per_side**3 + n_shell_nodes_without_touching = (n_nodes_per_side - 1) * n_nodes_per_side + expected_number_of_nodes = n_solid_nodes + n_shell_nodes_without_touching + + if average_per_body: + # Add boundary nodes again (duplicate nodes at the boundary) + expected_number_of_nodes += n_nodes_per_side + + return expected_number_of_nodes + + +def get_shell_scoping(solid_mesh: MeshedRegion): + split_scoping = operators.scoping.split_on_property_type() + split_scoping.inputs.mesh(solid_mesh) + split_scoping.inputs.label1("mat") + split_scoping.inputs.requested_location(locations.elemental) + + splitted_scoping = split_scoping.eval() + + return splitted_scoping.get_scoping({"mat": 2}) + + +def _check_nodal_across_body_results( + fields_container: FieldsContainer, + expected_results: dict[int, dict[str, float]], + on_skin: bool, +): + number_of_nodes_checked = 0 + assert len(fields_container) == 1 + field = fields_container[0] + for node_id, expected_result_per_node in expected_results.items(): + if node_id in field.scoping.ids: + number_of_nodes_checked += 1 + actual_result = field.get_entity_data_by_id(node_id) + + values_for_node = np.array(list(expected_result_per_node.values())) + assert values_for_node.size > 0 + assert values_for_node.size < 3 + avg_expected_result = np.mean(values_for_node) + + if on_skin and len(values_for_node) > 1: + # Skip elements at the edge that connects the body + # because the averaging on the skin is different. For instance + # 3 skin elements are involved the averaging of the inner elements + continue + assert np.isclose( + actual_result, avg_expected_result, rtol=1e-3 + ), f"{values_for_node}, {node_id}" + return number_of_nodes_checked + + +def _check_nodal_average_per_body_results( + fields_container: FieldsContainer, + expected_results: dict[int, dict[str, float]], +): + number_of_nodes_checked = 0 + for node_id, expected_result_per_node in expected_results.items(): + for material in [1, 2]: + field = fields_container.get_field({"mat": material}) + if node_id in field.scoping.ids: + number_of_nodes_checked += 1 + actual_result = field.get_entity_data_by_id(node_id) + expected_result = expected_result_per_node[str(material)] + assert np.isclose(actual_result, expected_result, rtol=1e-3) + return number_of_nodes_checked + + +def _check_elemental_per_body_results( + fields_container: FieldsContainer, + expected_results: dict[int, float], + shell_elements_scoping: Scoping, + element_id_to_skin_ids: dict[int, list[int]], +): + checked_elements = 0 + + for element_id, expected_value in expected_results.items(): + if element_id not in shell_elements_scoping.ids: + continue + skin_ids = element_id_to_skin_ids[element_id] + for skin_id in skin_ids: + for material in [1, 2]: + field = fields_container.get_field({"mat": material}) + if skin_id in field.scoping.ids: + assert np.isclose( + field.get_entity_data_by_id(skin_id), + expected_value, + rtol=1e-3, + ) + checked_elements += 1 + return checked_elements + + +def _check_elemental_across_body_results( + fields_container: FieldsContainer, + expected_results: dict[int, float], + shell_elements_scoping: Scoping, + element_id_to_skin_ids: dict[int, list[int]], +): + checked_elements = 0 + + for element_id, expected_value in expected_results.items(): + if element_id not in shell_elements_scoping.ids: + continue + skin_ids = element_id_to_skin_ids[element_id] + for skin_id in skin_ids: + assert len(fields_container) == 1 + field = fields_container[0] + if skin_id in field.scoping.ids: + assert np.isclose( + field.get_entity_data_by_id(skin_id), + expected_value, + rtol=1e-3, + ) + checked_elements += 1 + return checked_elements + + +def _get_element_id_to_skin_id_map(skin_mesh: MeshedRegion, solid_mesh: MeshedRegion): + skin_to_element_indices = skin_mesh.property_field("facets_to_ele") + + element_id_to_skin_ids = {} + for skin_id in skin_mesh.elements.scoping.ids: + element_idx = skin_to_element_indices.get_entity_data_by_id(skin_id)[0] + solid_element_id = solid_mesh.elements.scoping.ids[element_idx] + if solid_element_id not in element_id_to_skin_ids: + element_id_to_skin_ids[solid_element_id] = [] + element_id_to_skin_ids[solid_element_id].append(skin_id) + return element_id_to_skin_ids + + +@pytest.mark.parametrize("average_per_body", [False, True]) +@pytest.mark.parametrize("on_skin", [True, False]) +# Note: shell_layer selection with multiple layers (e.g top/bottom) currently not working correctly +# for mixed models. +@pytest.mark.parametrize("shell_layer", [shell_layers.top, shell_layers.bottom]) +@pytest.mark.parametrize("location", [locations.elemental, locations.nodal]) +def test_shell_layer_extraction( + mixed_shell_solid_simulation, + shell_layer_multi_body_ref, + average_per_body, + on_skin, + shell_layer, + location, +): + if not SERVERS_VERSION_GREATER_THAN_OR_EQUAL_TO_9_1: + return + + shell_layer_names = {shell_layers.top: "top", shell_layers.bottom: "bot"} + + if average_per_body: + averaging_config = AveragingConfig( + body_defining_properties=["mat"], average_per_body=True + ) + else: + averaging_config = AveragingConfig( + body_defining_properties=None, average_per_body=False + ) + + res = mixed_shell_solid_simulation._get_result( + base_name="S", + skin=on_skin, + components=["X"], + location=location, + category=ResultCategory.matrix, + shell_layer=shell_layer, + averaging_config=averaging_config, + ) + + if location == locations.nodal: + expected_results = get_ref_per_body_results_mechanical( + shell_layer_multi_body_ref[ + f"stress_{shell_layer_names[shell_layer]}_nodal" + ], + mixed_shell_solid_simulation.mesh._meshed_region, + ) + + expected_number_of_nodes = compute_number_of_expected_nodes( + on_skin, average_per_body + ) + + if average_per_body: + number_of_nodes_checked = _check_nodal_average_per_body_results( + fields_container=res._fc, + expected_results=expected_results, + ) + else: + number_of_nodes_checked = _check_nodal_across_body_results( + fields_container=res._fc, + expected_results=expected_results, + on_skin=on_skin, + ) + + assert number_of_nodes_checked == expected_number_of_nodes + + else: + ref_result = get_ref_result_per_element( + shell_layer_multi_body_ref[ + f"stress_{shell_layer_names[shell_layer]}_elemental" + ].combined + ) + checked_elements = 0 + + if on_skin: + skin_mesh = res._fc[0].meshed_region + solid_mesh = mixed_shell_solid_simulation.mesh._meshed_region + + shell_elements_scoping = get_shell_scoping(solid_mesh) + element_id_to_skin_ids = _get_element_id_to_skin_id_map( + skin_mesh, solid_mesh + ) + + # Note: In this branch only shell elements are checked, + # since only the shell elements are + # affected by the shell layer extraction. + # The skin of the solid elements is cumbersome to + # extract and check and is skipped here. + if average_per_body: + checked_elements = _check_elemental_per_body_results( + fields_container=res._fc, + expected_results=ref_result, + shell_elements_scoping=shell_elements_scoping, + element_id_to_skin_ids=element_id_to_skin_ids, + ) + else: + checked_elements = _check_elemental_across_body_results( + fields_container=res._fc, + expected_results=ref_result, + shell_elements_scoping=shell_elements_scoping, + element_id_to_skin_ids=element_id_to_skin_ids, + ) + + assert checked_elements == 9 + else: + for element_id, expected_value in ref_result.items(): + if average_per_body: + for material in [1, 2]: + field = res._fc.get_field({"mat": material}) + if element_id in field.scoping.ids: + assert np.isclose( + field.get_entity_data_by_id(element_id), + expected_value, + rtol=1e-3, + ), expected_value + checked_elements += 1 + else: + assert np.isclose( + res._fc[0].get_entity_data_by_id(element_id), + expected_value, + rtol=1e-3, + ), expected_value + checked_elements += 1 + assert checked_elements == 36 + + +@pytest.mark.parametrize( + "average_per_body", + [ + False, + pytest.param( + True, + marks=pytest.mark.xfail( + reason="Failing because scopings without results" + " are not handled correctly in the current implementation." + ), + ), + ], +) +@pytest.mark.parametrize("on_skin", [True, False]) +# Note: shell_layer selection with multiple layers (e.g top/bottom) currently not working correctly +# for mixed models. +@pytest.mark.parametrize("shell_layer", [shell_layers.top, shell_layers.bottom]) +@pytest.mark.parametrize("location", [locations.elemental, locations.nodal]) +@pytest.mark.parametrize( + "simulation_str", + [ + "two_cubes_contact_simulation", + pytest.param( + "mixed_shell_solid_with_contact_simulation", + marks=pytest.mark.xfail( + reason="Failing because scopings without results" + " are not handled correctly in the current implementation." + ), + ), + ], +) +def test_shell_layer_extraction_contacts( + simulation_str, average_per_body, on_skin, shell_layer, location, request +): + # Test some models with contacts, because models with contacts + # result in fields without results which can cause problems in conjunction + # with shell layer extraction. + simulation = request.getfixturevalue(simulation_str) + + if not SERVERS_VERSION_GREATER_THAN_OR_EQUAL_TO_9_1: + return + + if average_per_body: + averaging_config = AveragingConfig( + body_defining_properties=["mat"], average_per_body=True + ) + else: + averaging_config = AveragingConfig( + body_defining_properties=None, average_per_body=False + ) + + res = simulation._get_result( + base_name="S", + skin=on_skin, + components=["X"], + location=location, + category=ResultCategory.equivalent, + shell_layer=shell_layer, + averaging_config=averaging_config, + ) + + # Just do a rough comparison. + # This test is mainly to check if the + # workflow runs without errors because of + # empty fields for some materials + max_val = res.max().array[0] + if simulation_str == "two_cubes_contact_simulation": + assert max_val > 1 and max_val < 1.1 + else: + assert max_val > 7.7 and max_val < 7.8 + + @pytest.mark.parametrize("skin", all_configuration_ids) @pytest.mark.parametrize("result_name", ["stress", "elastic_strain", "displacement"]) @pytest.mark.parametrize("mode", [None, "principal", "equivalent"]) @@ -3544,7 +3912,7 @@ def get_node_and_data_map( return ReferenceDataItem(node_ids, data_rows) -def get_ref_data_from_csv(mesh: MeshedRegion, csv_file_name: ReferenceCsvFiles): +def get_ref_data_from_csv(mesh: MeshedRegion, csv_file_name: ReferenceCsvFilesNodal): combined_ref_data = get_node_and_data_map(mesh, csv_file_name.combined) per_id_ref_data = {} for mat_id, csv_file in csv_file_name.per_id.items(): @@ -3573,8 +3941,25 @@ def get_bodies_in_scoping(meshed_region: MeshedRegion, scoping: Scoping): return list(set(rescoped_mat_field.data)) +def get_ref_result_per_element( + csv_file_path: pathlib.Path, +): + elemental_data = {} + + with open(csv_file_path) as csv_file: + reader = csv.reader(csv_file, delimiter="\t") + + next(reader, None) + for idx, row in enumerate(reader): + element_id = int(row[0]) + assert elemental_data.get(element_id) is None + elemental_data[element_id] = float(row[1]) + return elemental_data + + def get_ref_result_per_node_and_material( - mesh: MeshedRegion, reference_csv_files: ReferenceCsvFiles + mesh: MeshedRegion, + reference_csv_files: ReferenceCsvFilesNodal, ): # Get the reference data from the csv files. # Returns a dictionary with node_id and mat_id as nested keys. @@ -3627,7 +4012,8 @@ def get_ref_result_per_node_and_material( def get_ref_per_body_results_mechanical( - reference_csv_files: ReferenceCsvFiles, mesh: MeshedRegion + reference_csv_files: ReferenceCsvFilesNodal, + mesh: MeshedRegion, ): return get_ref_result_per_node_and_material(mesh, reference_csv_files)