diff --git a/capellambse_context_diagrams/collectors/default.py b/capellambse_context_diagrams/collectors/default.py index 36b1c5e5..76f6c8dc 100644 --- a/capellambse_context_diagrams/collectors/default.py +++ b/capellambse_context_diagrams/collectors/default.py @@ -49,7 +49,7 @@ def __init__( self.global_boxes = {self.centerbox.id: self.centerbox} self.made_boxes = {self.centerbox.id: self.centerbox} self.boxes_to_delete = {self.centerbox.id} - self.edges: list[fa.AbstractExchange] = [] + self.exchanges: list[fa.AbstractExchange] = [] if self.diagram._display_parent_relation: self.diagram_target_owners = list( generic.get_all_owners(self.diagram.target) @@ -103,22 +103,56 @@ def process_context(self): generic.move_parent_boxes_to_owner( owner_boxes, self.diagram.target, self.data ) - generic.move_edges(owner_boxes, self.edges, self.data) + generic.move_edges(owner_boxes, self.exchanges, self.data) self.centerbox.height = max( self.centerbox.height, *stack_heights.values() ) + def _process_port_spread( + self, + exs: list[fa.AbstractExchange], + attr: str, + inc: int, + port_spread: dict[str, int], + owners: dict[str, str], + ) -> None: + for ex in exs: + elem = getattr(ex, attr).owner + if (owner := owners.get(elem.uuid)) is None: + try: + owner = [ + uuid + for uuid in generic.get_all_owners(elem) + if uuid not in self.diagram_target_owners + ][-1] + except (IndexError, AttributeError): + owner = elem.uuid + assert owner is not None + owners[elem.uuid] = owner + port_spread.setdefault(owner, 0) + port_spread[owner] += inc + def _process_exchanges(self) -> tuple[ list[common.GenericElement], list[generic.ExchangeData], ]: - ports = port_collector(self.diagram.target, self.diagram.type) - connections = port_exchange_collector(ports) - self.centerbox.ports = [makers.make_port(uuid) for uuid in connections] - self.edges = list(chain.from_iterable(connections.values())) + inc, out = port_collector(self.diagram.target, self.diagram.type) + inc_c = port_exchange_collector(inc) + out_c = port_exchange_collector(out) + inc_exchanges = list(chain.from_iterable(inc_c.values())) + out_exchanges = list(chain.from_iterable(out_c.values())) + port_spread: dict[str, int] = {} + owners: dict[str, str] = {} + self._process_port_spread( + inc_exchanges, "source", 1, port_spread, owners + ) + self._process_port_spread( + out_exchanges, "target", -1, port_spread, owners + ) + self.exchanges = inc_exchanges + out_exchanges ex_datas: list[generic.ExchangeData] = [] - for ex in self.edges: + for ex in self.exchanges: if is_hierarchical := exchanges.is_hierarchical( ex, self.centerbox ): @@ -138,23 +172,39 @@ def _process_exchanges(self) -> tuple[ self.params, is_hierarchical, ) - generic.exchange_data_collector(ex_data) + src, tgt = generic.exchange_data_collector(ex_data) + src_owner = owners.get(src.owner.uuid, "") + tgt_owner = owners.get(tgt.owner.uuid, "") + is_inc = tgt.parent == self.diagram.target + is_out = src.parent == self.diagram.target + if is_inc and is_out: + pass + elif (is_out and (port_spread.get(tgt_owner, 0) > 0)) or ( + is_inc and (port_spread.get(src_owner, 0) <= 0) + ): + elkdata.edges[-1].sources = [tgt.uuid] + elkdata.edges[-1].targets = [src.uuid] ex_datas.append(ex_data) except AttributeError: continue - return ports, ex_datas + self.centerbox.ports = [ + makers.make_port(uuid) for uuid in {**inc_c, **out_c} + ] + return (inc + out), ex_datas def _process_ports(self, stack_heights: dict[str, float | int]) -> None: ports, ex_datas = self._process_exchanges() - for port, local_ports, side in port_context_collector(ex_datas, ports): - _, label_height = helpers.get_text_extent(port.name) + for owner, local_ports, side in port_context_collector( + ex_datas, ports + ): + _, label_height = helpers.get_text_extent(owner.name) height = max( label_height + 2 * makers.LABEL_VPAD, makers.PORT_PADDING + (makers.PORT_SIZE + makers.PORT_PADDING) * len(local_ports), ) - if box := self.global_boxes.get(port.uuid): # type: ignore[assignment] + if box := self.global_boxes.get(owner.uuid): # type: ignore[assignment] if box is self.centerbox: continue box.ports.extend( @@ -163,14 +213,14 @@ def _process_ports(self, stack_heights: dict[str, float | int]) -> None: box.height += height else: box = self._make_box( - port, + owner, height=height, no_symbol=self.diagram._display_symbols_as_boxes, ) box.ports = [makers.make_port(j.uuid) for j in local_ports] if self.diagram._display_parent_relation: - current = port + current = owner while ( current and current.uuid not in self.diagram_target_owners @@ -236,38 +286,52 @@ def collector( def port_collector( target: common.GenericElement | common.ElementList, diagram_type: DT -) -> list[common.GenericElement]: +) -> tuple[list[common.GenericElement], list[common.GenericElement]]: """Savely collect ports from `target`.""" def __collect(target): - all_ports: list[common.GenericElement] = [] + incoming_ports: list[common.GenericElement] = [] + outgoing_ports: list[common.GenericElement] = [] for attr in generic.DIAGRAM_TYPE_TO_CONNECTOR_NAMES[diagram_type]: try: ports = getattr(target, attr) - if ports and isinstance( + if not ports or not isinstance( ports[0], (fa.FunctionPort, fa.ComponentPort, cs.PhysicalPort), ): - all_ports.extend(ports) + continue + if attr == "inputs": + incoming_ports.extend(ports) + elif attr == "ports": + for port in ports: + if port.direction == "IN": + incoming_ports.append(port) + else: + outgoing_ports.append(port) + else: + outgoing_ports.extend(ports) except AttributeError: pass - return all_ports + return incoming_ports, outgoing_ports if isinstance(target, cabc.Iterable): assert not isinstance(target, common.GenericElement) - all_ports: list[common.GenericElement] = [] + incoming_ports: list[common.GenericElement] = [] + outgoing_ports: list[common.GenericElement] = [] for obj in target: - all_ports.extend(__collect(obj)) + inc, out = __collect(obj) + incoming_ports.extend(inc) + outgoing_ports.extend(out) else: - all_ports = __collect(target) - return all_ports + incoming_ports, outgoing_ports = __collect(target) + return incoming_ports, outgoing_ports def _extract_edges( - obj: common.ElementList[common.GenericElement], + obj: common.GenericElement, attribute: str, filter: Filter, -) -> common.ElementList[common.GenericElement] | list: +) -> cabc.Iterable[common.GenericElement]: return filter(getattr(obj, attribute, [])) @@ -370,7 +434,8 @@ def derive_from_functions( assert isinstance(diagram.target, cs.Component) ports = [] for fnc in diagram.target.allocated_functions: - ports.extend(port_collector(fnc, diagram.type)) + inc, out = port_collector(fnc, diagram.type) + ports.extend(inc + out) context_box_ids = {child.id for child in data.children} components: dict[str, cs.Component] = {} diff --git a/capellambse_context_diagrams/collectors/exchanges.py b/capellambse_context_diagrams/collectors/exchanges.py index 16798403..a7baae83 100644 --- a/capellambse_context_diagrams/collectors/exchanges.py +++ b/capellambse_context_diagrams/collectors/exchanges.py @@ -22,7 +22,7 @@ class ExchangeCollector(metaclass=abc.ABCMeta): """Base class for context collection on Exchanges.""" - intermap: dict[str, DT] = { + intermap: dict[DT, tuple[str, str, str, str]] = { DT.OAB: ("source", "target", "allocated_interactions", "activities"), DT.SAB: ( "source.owner", @@ -216,9 +216,9 @@ def make_all_owners( obj: fa.AbstractFunction | fa.FunctionPort, boxes: dict[str, _elkjs.ELKInputChild], ) -> str: - owners: list[fa.AbstractFunction | cs.Component] = [] + owners: list[common.GenericElement] = [] assert self.right is not None and self.left is not None - root: cs.Component | None = None + root: _elkjs.ELKInputChild | None = None for uuid in generic.get_all_owners(obj): element = self.obj._model.by_uuid(uuid) if uuid in {self.right.id, self.left.id}: @@ -230,7 +230,7 @@ def make_all_owners( if root is None: raise ValueError(f"No root found for {obj._short_repr_()}") - owner_box: common.GenericElement = root + owner_box = root for owner in reversed(owners): if isinstance(owner, fa.FunctionPort): if owner.uuid in (p.id for p in owner_box.ports): diff --git a/capellambse_context_diagrams/collectors/generic.py b/capellambse_context_diagrams/collectors/generic.py index 41927d9e..94c8e6ea 100644 --- a/capellambse_context_diagrams/collectors/generic.py +++ b/capellambse_context_diagrams/collectors/generic.py @@ -254,7 +254,7 @@ def move_edges( def get_all_owners(obj: common.GenericElement) -> cabc.Iterator[str]: """Return the UUIDs from all owners of ``obj``.""" - current = obj + current: common.GenericElement | None = obj while current is not None: yield current.uuid current = getattr(current, "owner", None) diff --git a/capellambse_context_diagrams/collectors/tree_view.py b/capellambse_context_diagrams/collectors/tree_view.py index fdd58d55..be396189 100644 --- a/capellambse_context_diagrams/collectors/tree_view.py +++ b/capellambse_context_diagrams/collectors/tree_view.py @@ -10,6 +10,7 @@ import math import typing as t +from capellambse.model import common from capellambse.model.crosslayer import information from .. import _elkjs, context @@ -26,7 +27,6 @@ "elk.direction": "DOWN", "edgeRouting": "ORTHOGONAL", } -ASSOC_STYLECLASS = "__Association" class ClassProcessor: @@ -41,7 +41,6 @@ def __init__( self.data_types: set[str] = set() self.legend_boxes: list[_elkjs.ELKInputChild] = [] self.all_associations = all_associations - self.edge_counter = 0 def __contains__(self, uuid: str) -> bool: objects = self.data.children + self.data.edges # type: ignore[operator] @@ -55,16 +54,12 @@ def process_class(self, cls: ClassInfo, params: dict[str, t.Any]): edges = [ assoc for assoc in self.all_associations - if cls.prop in assoc.navigable_members + if cls.prop in list(assoc.navigable_members) ] - if len(edges) == 1: - edge_id = edges[0].uuid - else: - edge_id = f"{ASSOC_STYLECLASS}:{self.edge_counter}" - self.edge_counter += 1 + edge_id = edges[0].uuid if edge_id not in self.made_edges: self.made_edges.add(edge_id) - text = cls.prop.name + text = cls.prop.name if cls.prop else "" if cls.multiplicity is None: start = end = "1" else: @@ -183,7 +178,7 @@ class ClassInfo: source: information.Class target: information.Class | None - prop: information.Property + prop: information.Property | None partition: int multiplicity: tuple[str, str] | None generalizes: information.Class | None = None @@ -266,7 +261,10 @@ def get_all_classes( process_property(property) if super == "ALL" or (super == "ROOT" and partition == 1): - if root.super and not root.super.is_primitive: + if ( + isinstance(root.super, information.Class) + and not root.super.is_primitive + ): for prop in root.super.owned_properties: process_property( _PropertyInfo( @@ -421,7 +419,7 @@ def _get_property_text(prop: information.Property) -> str: def _get_legend_labels( - obj: information.datatype.Enumeration | information.Class, + obj: common.GenericElement, ) -> cabc.Iterator[makers._LabelBuilder]: yield { "text": obj.name, @@ -438,7 +436,7 @@ def _get_legend_labels( elif isinstance(obj, information.Class): labels = [_get_property_text(prop) for prop in obj.owned_properties] else: - return + labels = [] layout_options = DATA_TYPE_LABEL_LAYOUT_OPTIONS for label in labels: yield {"text": label, "icon": (0, 0), "layout_options": layout_options}