diff --git a/capellambse_context_diagrams/collectors/exchanges.py b/capellambse_context_diagrams/collectors/exchanges.py index 7c8952ab..e7f776ec 100644 --- a/capellambse_context_diagrams/collectors/exchanges.py +++ b/capellambse_context_diagrams/collectors/exchanges.py @@ -9,7 +9,7 @@ import typing as t from capellambse.model import common -from capellambse.model.crosslayer import cs +from capellambse.model.crosslayer import cs, fa from capellambse.model.modeltypes import DiagramType as DT from .. import _elkjs, context @@ -24,20 +24,20 @@ class ExchangeCollector(metaclass=abc.ABCMeta): intermap: dict[str, DT] = { DT.OAB: ("source", "target", "allocated_interactions", "activities"), DT.SAB: ( - "source.parent", - "target.parent", + "source.owner", + "target.owner", "allocated_functional_exchanges", "allocated_functions", ), DT.LAB: ( - "source.parent", - "target.parent", + "source.owner", + "target.owner", "allocated_functional_exchanges", "allocated_functions", ), DT.PAB: ( - "source.parent", - "target.parent", + "source.owner", + "target.owner", "allocated_functional_exchanges", "allocated_functions", ), @@ -62,81 +62,21 @@ def __init__( self.get_alloc_fex = operator.attrgetter(alloc_fex) self.get_alloc_functions = operator.attrgetter(fncs) - def get_functions_and_exchanges( - self, comp: common.GenericElement, interface: common.GenericElement - ) -> tuple[ - list[common.GenericElement], - dict[str, common.GenericElement], - dict[str, common.GenericElement], - ]: - """Return `Function`s, incoming and outgoing - `FunctionalExchange`s for given `Component` and `interface`. - """ - functions, incomings, outgoings = [], {}, {} - alloc_functions = self.get_alloc_functions(comp) - for fex in self.get_alloc_fex(interface): - source = self.get_source(fex) - if source in alloc_functions: - if fex.uuid not in outgoings: - outgoings[fex.uuid] = fex - if source not in functions: - functions.append(source) - - target = self.get_target(fex) - if target in alloc_functions: - if fex.uuid not in incomings: - incomings[fex.uuid] = fex - if target not in functions: - functions.append(target) - - return functions, incomings, outgoings - - def collect_context( - self, comp: common.GenericElement, interface: common.GenericElement - ) -> tuple[ - dict[str, t.Any], - dict[str, common.GenericElement], - dict[str, common.GenericElement], - ]: - functions, incomings, outgoings = self.get_functions_and_exchanges( - comp, interface - ) - components = [] - for cmp in comp.components: - fncs, _, _ = self.get_functions_and_exchanges(cmp, interface) - functions.extend(fncs) - if fncs: - c, incs, outs = self.collect_context(cmp, interface) - components.append(c) - incomings |= incs - outgoings |= outs - - start = { - "element": comp, - "functions": functions, - "components": components, - } - if self.diagram.hide_functions: - start["functions"] = [] - incomings = {} - outgoings = {} - - return start, incomings, outgoings - - def make_ports_and_update_children_size( + def update_children_size( self, data: _elkjs.ELKInputChild, exchanges: t.Sequence[_elkjs.ELKInputEdge], ) -> None: - """Adjust size of functions and make ports.""" + """Adjust size of functions.""" stack_height: int | float = -makers.NEIGHBOR_VMARGIN for child in data.children: inputs, outputs = [], [] obj = self.obj._model.by_uuid(child.id) if isinstance(obj, cs.Component): - self.make_ports_and_update_children_size(child, exchanges) + self.update_children_size(child, exchanges) return - port_ids = {p.uuid for p in obj.inputs + obj.outputs} + + port_ids = {p.id for p in child.ports} for ex in exchanges: source, target = ex.sources[0], ex.targets[0] if source in port_ids: @@ -144,11 +84,6 @@ def make_ports_and_update_children_size( elif target in port_ids: inputs.append(target) - if generic.DIAGRAM_TYPE_TO_CONNECTOR_NAMES[self.diagram.type]: - child.ports = [ - makers.make_port(i) for i in set(inputs + outputs) - ] - childnum = max(len(inputs), len(outputs)) height = max( child.height + 2 * makers.LABEL_VPAD, @@ -179,8 +114,7 @@ def get_elkdata_for_exchanges( collector = collector_type(diagram, data, params) collector.collect() for comp in data.children: - collector.make_ports_and_update_children_size(comp, data.edges) - + collector.update_children_size(comp, data.edges) return data @@ -203,91 +137,100 @@ def __init__( data: _elkjs.ELKInputData, params: dict[str, t.Any], ) -> None: - self.left = None - self.right = None + self.left: _elkjs.ELKInputChild | None = None + self.right: _elkjs.ELKInputChild | None = None self.incoming_edges = {} self.outgoing_edges = {} super().__init__(diagram, data, params) self.get_left_and_right() - if diagram.include_interface: + if diagram.hide_functions: + assert self.left is not None + self.left.children = [] # type: ignore[unreachable] + assert self.right is not None + self.right.children = [] + self.incoming_edges = {} + self.outgoing_edges = {} + + if diagram.include_interface or diagram.hide_functions: self.add_interface() def get_left_and_right(self) -> None: - made_children: set[str] = set() - - def get_capella_order( - comp: common.GenericElement, functions: list[common.GenericElement] - ) -> list[common.GenericElement]: - alloc_functions = self.get_alloc_functions(comp) - return [fnc for fnc in alloc_functions if fnc in functions] - - def make_boxes(cntxt: dict[str, t.Any]) -> _elkjs.ELKInputChild | None: - comp = cntxt["element"] - functions = cntxt["functions"] - if self.diagram.hide_functions: - functions = [] - - components = cntxt["components"] - if comp.uuid not in made_children: - children = [ - makers.make_box(fnc) - for fnc in functions - if fnc in self.get_alloc_functions(comp) - ] - for cmp in components: - if child := make_boxes(cmp): - children.append(child) - if children: - layout_options = makers.DEFAULT_LABEL_LAYOUT_OPTIONS - else: - layout_options = makers.CENTRIC_LABEL_LAYOUT_OPTIONS - - box = makers.make_box( - comp, no_symbol=True, layout_options=layout_options + try: + self.collect_context() + + port_spread = len(self.outgoing_edges) - len(self.incoming_edges) + _port_spread = len(self.incoming_edges) - len(self.outgoing_edges) + if port_spread <= _port_spread: + self.incoming_edges, self.outgoing_edges = ( + self.outgoing_edges, + self.incoming_edges, ) - box.children = children - made_children.add(comp.uuid) - return box - return None + self.left, self.right = self.right, self.left - try: - comp = self.get_source(self.obj) - left_context, incs, outs = self.collect_context(comp, self.obj) - inc_port_ids = set(ex.target.uuid for ex in incs.values()) - out_port_ids = set(ex.source.uuid for ex in outs.values()) - port_spread = len(out_port_ids) - len(inc_port_ids) - - _comp = self.get_target(self.obj) - right_context, _, _ = self.collect_context(_comp, self.obj) - _inc_port_ids = set(ex.target.uuid for ex in outs.values()) - _out_port_ids = set(ex.source.uuid for ex in incs.values()) - _port_spread = len(_out_port_ids) - len(_inc_port_ids) - - left_context["functions"] = get_capella_order( - comp, left_context["functions"] - ) - right_context["functions"] = get_capella_order( - _comp, right_context["functions"] - ) - if port_spread >= _port_spread: - self.incoming_edges = incs - self.outgoing_edges = outs - else: - self.incoming_edges = outs - self.outgoing_edges = incs - left_context, right_context = right_context, left_context - - if left_child := make_boxes(left_context): - self.data.children.append(left_child) - self.left = left_child - if right_child := make_boxes(right_context): - self.data.children.append(right_child) - self.right = right_child + assert self.left is not None + self.data.children.append(self.left) + assert self.right is not None + self.data.children.append(self.right) except AttributeError as error: logger.exception("Interface collection failed: \n%r", str(error)) + def collect_context(self): + self.left = makers.make_box(self.get_source(self.obj), no_symbol=True) + self.right = makers.make_box(self.get_target(self.obj), no_symbol=True) + boxes: dict[str, _elkjs.ELKInputChild] = { + self.left.id: self.left, + self.right.id: self.right, + } + for fex in self.get_alloc_fex(self.obj): + srid = self.make_all_owners(fex.source, boxes) + trid = self.make_all_owners(fex.target, boxes) + if [srid, trid] == [self.right.id, self.left.id]: + self.incoming_edges[fex.uuid] = fex + elif [srid, trid] == [self.left.id, self.right.id]: + self.outgoing_edges[fex.uuid] = fex + + if self.left.children: + for label in self.left.labels: + label.layoutOptions = makers.DEFAULT_LABEL_LAYOUT_OPTIONS + if self.right.children: + for label in self.left.labels: + label.layoutOptions = makers.DEFAULT_LABEL_LAYOUT_OPTIONS + + def make_all_owners( + self, obj: fa.AbstractFunction, boxes: dict[str, _elkjs.ELKInputChild] + ) -> str: + owners: list[fa.AbstractFunction | cs.Component] = [] + assert self.right is not None and self.left is not None + root: cs.Component | None = None + for uuid in generic.get_all_owners(obj): + element = self.obj._model.by_uuid(uuid) + if uuid in {self.right.id, self.left.id}: + root = boxes[uuid] + break + + owners.append(element) + + assert root is not None, f"No root found for {obj._short_repr_()}" + owner_box: common.GenericElement = root + for owner in owners[::-1]: + if isinstance(owner, fa.FunctionPort): + owner_box.ports.append(makers.make_port(owner.uuid)) + else: + if owner.uuid in (b.id for b in owner_box.children): + owner_box = boxes[owner.uuid] + continue + + box = boxes.setdefault( + owner.uuid, makers.make_box(owner, no_symbol=True) + ) + owner_box.children.append(box) + for label in owner_box.labels: + label.layoutOptions = makers.DEFAULT_LABEL_LAYOUT_OPTIONS + owner_box = box + return root.id + def add_interface(self) -> None: ex_data = generic.ExchangeData( self.obj, @@ -338,48 +281,7 @@ def __init__( super().__init__(diagram, data, params) def collect(self) -> None: - functional_exchanges: list[common.GenericElement] = [] - all_functions: list[common.GenericElement] = [] - made_children: set[str] = {self.obj.uuid} - try: - for interface in self.obj.exchanges: - if self.get_source(interface) == self.obj: - comp = self.get_target(interface) - else: - comp = self.get_source(interface) - - functions, inc, outs = self.get_functions_and_exchanges( - self.obj, interface - ) - if comp.uuid not in made_children: - children = [makers.make_box(c) for c in functions] - if children: - layout_options = makers.DEFAULT_LABEL_LAYOUT_OPTIONS - else: - layout_options = makers.CENTRIC_LABEL_LAYOUT_OPTIONS - - box = makers.make_box(comp, layout_options=layout_options) - box.children = children - self.data.children.append(box) - made_children.add(comp.uuid) - - all_functions.extend(functions) - functional_exchanges.extend(inc | outs) - - self.data.children[0].children = [ - makers.make_box(c) - for c in all_functions - if c in self.obj.functions - ] - except AttributeError: - pass - - for ex in functional_exchanges: - generic.exchange_data_collector( - generic.ExchangeData( - ex, self.data, set(), is_hierarchical=False - ) - ) + raise NotImplementedError() def is_hierarchical( diff --git a/capellambse_context_diagrams/context.py b/capellambse_context_diagrams/context.py index ad0fe259..b4fc0766 100644 --- a/capellambse_context_diagrams/context.py +++ b/capellambse_context_diagrams/context.py @@ -402,9 +402,6 @@ def _create_diagram(self, params: dict[str, t.Any]) -> cdiagram.Diagram: if override := params.pop(param_name, False): setattr(self, param_name, override) - if self.hide_functions: - self.include_interface = True - params["elkdata"] = exchanges.get_elkdata_for_exchanges( self, exchanges.InterfaceContextCollector, params )