diff --git a/capellambse_context_diagrams/__init__.py b/capellambse_context_diagrams/__init__.py index d799aed..ab2663b 100644 --- a/capellambse_context_diagrams/__init__.py +++ b/capellambse_context_diagrams/__init__.py @@ -69,6 +69,7 @@ def init() -> None: register_realization_view() register_data_flow_view() register_cable_tree_view() + register_custom_diagram() # register_functional_context() XXX: Future @@ -313,3 +314,12 @@ def register_cable_tree_view() -> None: {}, ), ) + + +def register_custom_diagram() -> None: + """Add the `custom_diagram` attribute to `ModelObject`s.""" + m.set_accessor( + sa.SystemFunction, + "custom_diagram", + context.CustomContextAccessor(DiagramType.SAB.value, {}), + ) diff --git a/capellambse_context_diagrams/collectors/custom.py b/capellambse_context_diagrams/collectors/custom.py new file mode 100644 index 0000000..76e2252 --- /dev/null +++ b/capellambse_context_diagrams/collectors/custom.py @@ -0,0 +1,232 @@ +# SPDX-FileCopyrightText: 2022 Copyright DB InfraGO AG and the capellambse-context-diagrams contributors +# SPDX-License-Identifier: Apache-2.0 + +"""This module defines the collector for the CustomDiagram.""" +from __future__ import annotations + +import collections.abc as cabc +import typing as t + +import capellambse.model as m + +from .. import _elkjs, context +from . import generic, makers + + +class CustomCollector: + """Collect the context for a custom diagram.""" + + def __init__( + self, + diagram: context.ContextDiagram, + params: dict[str, t.Any], + ) -> None: + self.diagram = diagram + self.obj: m.ModelElement = self.diagram.target + self.data = makers.make_diagram(diagram) + self.params = params + self.instructions = self.diagram._collect + self.boxes: dict[str, _elkjs.ELKInputChild] = {} + self.edges: dict[str, _elkjs.ELKInputEdge] = {} + self.ports: dict[str, _elkjs.ELKInputPort] = {} + self.boxes_to_delete: set[str] = set() + if self.diagram._display_parent_relation: + self.diagram_target_owners = list( + generic.get_all_owners(self.diagram.target) + ) + self.common_owners: set[str] = set() + if self.diagram._unify_edge_direction: + self.dicrections: dict[str, bool] = {} + + def __call__(self) -> _elkjs.ELKInputData: + self._make_target(self.obj) + if not self.instructions: + return self._get_data() + self._perform_get(self.obj, self.instructions) + if self.diagram._display_parent_relation and self.obj.owner: + current = self.obj.owner + while ( + current + and self.common_owners + and hasattr(current, "owner") + and not isinstance(current.owner, generic.PackageTypes) + ): + current = self._make_owner_box( + current, + ) + self.common_owners.discard(current.uuid) + for uuid in self.boxes_to_delete: + del self.boxes[uuid] + return self._get_data() + + def _get_data(self) -> t.Any: + self.data.children = list(self.boxes.values()) + self.data.edges = list(self.edges.values()) + return self.data + + def _matches_filters( + self, obj: m.ModelElement, filters: dict[str, t.Any] + ) -> bool: + for key, value in filters.items(): + if getattr(obj, key) != value: + return False + return True + + def _perform_get( + self, obj: m.ModelElement, instructions: dict[str, t.Any] + ) -> None: + if insts := instructions.get("get"): + create = False + elif insts := instructions.get("include"): + create = True + if not insts: + return + if isinstance(insts, dict): + insts = [insts] + assert isinstance(insts, list) + for i in insts: + attr = i.get("name") + assert attr, "Attribute name is required." + target = getattr(obj, attr, None) + if isinstance(target, cabc.Iterable): + filters = i.get("filter", {}) + for item in target: + if not self._matches_filters(item, filters): + continue + if create: + self._make_target(item) + self._perform_get(item, i) + elif isinstance(target, m.ModelElement): + if create: + self._make_target(target) + self._perform_get(target, i) + + def _make_target( + self, obj: m.ModelElement + ) -> _elkjs.ELKInputChild | _elkjs.ELKInputEdge | None: + if _is_edge(obj): + return self._make_edge_and_ports(obj) + return self._make_box(obj, slim_width=self.diagram._slim_center_box) + + def _make_box( + self, + obj: m.ModelElement, + **kwargs: t.Any, + ) -> _elkjs.ELKInputChild: + box = makers.make_box( + obj, + no_symbol=self.diagram._display_symbols_as_boxes, + **kwargs, + ) + self.boxes[obj.uuid] = box + if self.diagram._display_unused_ports: + for attr in generic.DIAGRAM_TYPE_TO_CONNECTOR_NAMES[ + self.diagram.type + ]: + for port in getattr(obj, attr, []): + self._make_port_and_owner(port) + if self.diagram._display_parent_relation: + current = obj + while ( + current + and current.uuid not in self.diagram_target_owners + and getattr(current, "owner", None) is not None + and not isinstance(current.owner, generic.PackageTypes) + ): + current = self._make_owner_box(current) + self.common_owners.add(current.uuid) + return box + + def _make_owner_box( + self, + obj: t.Any, + ) -> t.Any: + if not (parent_box := self.boxes.get(obj.owner.uuid)): + parent_box = self._make_box( + obj.owner, + layout_options=makers.DEFAULT_LABEL_LAYOUT_OPTIONS, + ) + assert (obj_box := self.boxes.get(obj.uuid)) + for box in (children := parent_box.children): + if box.id == obj.uuid: + box = obj_box + break + else: + children.append(obj_box) + for label in parent_box.labels: + label.layoutOptions = makers.DEFAULT_LABEL_LAYOUT_OPTIONS + self.boxes_to_delete.add(obj.uuid) + return obj.owner + + def _make_edge_and_ports( + self, + edge_obj: m.ModelElement, + ) -> _elkjs.ELKInputEdge | None: + src_obj = edge_obj.source + tgt_obj = edge_obj.target + src_owner = src_obj.owner + tgt_owner = tgt_obj.owner + if self.diagram._hide_direct_children: + if ( + getattr(src_owner, "owner", None) == self.obj + or getattr(tgt_owner, "owner", None) == self.obj + ): + return None + if self.diagram._unify_edge_direction: + src_dir = self.dicrections.get(src_owner.uuid) + tgt_dir = self.dicrections.get(tgt_owner.uuid) + if (src_dir is None) and (tgt_dir is None): + self.dicrections[src_owner.uuid] = False + self.dicrections[tgt_owner.uuid] = True + elif src_dir is None: + self.dicrections[src_owner.uuid] = not tgt_dir + elif tgt_dir is None: + self.dicrections[tgt_owner.uuid] = not src_dir + if self.dicrections[src_owner.uuid]: + src_obj, tgt_obj = tgt_obj, src_obj + self._make_port_and_owner(src_obj) + self._make_port_and_owner(tgt_obj) + edge = _elkjs.ELKInputEdge( + id=edge_obj.uuid, + sources=[src_obj.uuid], + targets=[tgt_obj.uuid], + labels=makers.make_label( + edge_obj.name, + ), + ) + self.edges[edge_obj.uuid] = edge + return edge + + def _make_port_and_owner( + self, port_obj: m.ModelElement + ) -> _elkjs.ELKInputPort: + owner_obj = port_obj.owner + if not (box := self.boxes.get(owner_obj.uuid)): + box = self._make_box( + owner_obj, + layout_options=makers.DEFAULT_LABEL_LAYOUT_OPTIONS, + ) + if port := self.ports.get(port_obj.uuid): + return port + port = makers.make_port(port_obj.uuid) + if self.diagram._display_port_labels: + text = port_obj.name or "UNKNOWN" + port.labels = makers.make_label(text) + box.ports.append(port) + self.ports[port_obj.uuid] = port + return port + + +def _is_edge(obj: m.ModelElement) -> bool: + styleclass = obj.xtype.rsplit(":", 1)[-1] + for sub_str in ("Link", "Exchange"): + if sub_str in styleclass: + return True + return False + + +def collector( + diagram: context.ContextDiagram, params: dict[str, t.Any] +) -> _elkjs.ELKInputData: + """Collect data for rendering a custom diagram.""" + return CustomCollector(diagram, params)() diff --git a/capellambse_context_diagrams/context.py b/capellambse_context_diagrams/context.py index d92bf6c..d39acf3 100644 --- a/capellambse_context_diagrams/context.py +++ b/capellambse_context_diagrams/context.py @@ -20,6 +20,7 @@ from . import _elkjs, filters, serializers, styling from .collectors import ( cable_tree, + custom, dataflow_view, exchanges, get_elkdata, @@ -205,6 +206,22 @@ def __get__( # type: ignore return self._get(obj, CableTreeViewDiagram) +class CustomContextAccessor(ContextAccessor): + """Provides access to the custom context diagrams.""" + + def __get__( # type: ignore + self, + obj: m.T | None, + objtype: type | None = None, + ) -> m.Accessor | ContextDiagram: + """Make a CustomDiagram for the given model object.""" + del objtype + if obj is None: # pragma: no cover + return self + assert isinstance(obj, m.ModelElement) + return self._get(obj, CustomDiagram) + + class ContextDiagram(m.AbstractDiagram): """An automatically generated context diagram. @@ -254,6 +271,7 @@ class ContextDiagram(m.AbstractDiagram): * display_unused_ports - Display ports that are not connected to an edge. """ + _collect: dict[str, t.Any] _display_symbols_as_boxes: bool _display_parent_relation: bool _hide_direct_children: bool @@ -263,6 +281,7 @@ class ContextDiagram(m.AbstractDiagram): _port_label_position: str _transparent_background: bool _display_unused_ports: bool + _unify_edge_direction: bool def __init__( self, @@ -282,6 +301,7 @@ def __init__( self._elk_input_data: CollectorOutputData | None = None self.__filters: cabc.MutableSet[str] = self.FilterSet(self) self._default_render_parameters = { + "collect": {}, "display_symbols_as_boxes": False, "display_parent_relation": False, "hide_direct_children": False, @@ -291,6 +311,7 @@ def __init__( "port_label_position": _elkjs.PORT_LABEL_POSITION.OUTSIDE.name, "display_unused_ports": False, "transparent_background": False, + "unify_edge_direction": False, } | default_render_parameters if standard_filter := STANDARD_FILTERS.get(class_): @@ -393,7 +414,11 @@ def __len__(self) -> int: def _create_diagram(self, params: dict[str, t.Any]) -> cdiagram.Diagram: data = self.elk_input_data(params) assert not isinstance(data, tuple) - if not isinstance(self, ClassTreeDiagram) and has_single_child(data): + if ( + not isinstance(self, ClassTreeDiagram) + and not isinstance(self, CustomDiagram) + and has_single_child(data) + ): self._display_derived_interfaces = True data = get_elkdata(self, params) @@ -860,6 +885,58 @@ def name(self) -> str: # type: ignore return f"Cable Tree View of {self.target.name}" +class CustomDiagram(ContextDiagram): + """An automatically generated CustomDiagram Diagram.""" + + _collect: dict[str, t.Any] + _display_symbols_as_boxes: bool + _display_parent_relation: bool + _hide_direct_children: bool + _slim_center_box: bool + _display_port_labels: bool + _port_label_position: str + _transparent_background: bool + _display_unused_ports: bool + _unify_edge_direction: bool + + def __init__( + self, + class_: str, + obj: m.ModelElement, + *, + render_styles: dict[str, styling.Styler] | None = None, + default_render_parameters: dict[str, t.Any], + ) -> None: + default_render_parameters = { + "collect": {}, + "display_symbols_as_boxes": False, + "display_parent_relation": False, + "hide_direct_children": False, + "slim_center_box": True, + "display_port_labels": False, + "port_label_position": _elkjs.PORT_LABEL_POSITION.OUTSIDE.name, + "transparent_background": False, + "display_unused_ports": False, + "unify_edge_direction": False, + } | default_render_parameters + super().__init__( + class_, + obj, + render_styles=render_styles, + default_render_parameters=default_render_parameters, + ) + self.collector = custom.collector + + @property + def uuid(self) -> str: # type: ignore + """Returns the UUID of the diagram.""" + return f"{self.target.uuid}_custom_diagram" + + @property + def name(self) -> str: # type: ignore + return f"Custom Diagram of {self.target.name}" + + def try_to_layout(data: _elkjs.ELKInputData) -> _elkjs.ELKOutputData: """Try calling elkjs, raise a JSONDecodeError if it fails.""" try: