Skip to content

Commit

Permalink
fix(context-diagram): Rewrite collection for Interface Context
Browse files Browse the repository at this point in the history
  • Loading branch information
ewuerger committed Jul 16, 2024
1 parent fd9ad24 commit c76d2f5
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 195 deletions.
286 changes: 94 additions & 192 deletions capellambse_context_diagrams/collectors/exchanges.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import typing as t

from capellambse.model import common
from capellambse.model.crosslayer import cs
from capellambse.model.crosslayer import cs, fa
from capellambse.model.modeltypes import DiagramType as DT

from .. import _elkjs, context
Expand All @@ -24,20 +24,20 @@ class ExchangeCollector(metaclass=abc.ABCMeta):
intermap: dict[str, DT] = {
DT.OAB: ("source", "target", "allocated_interactions", "activities"),
DT.SAB: (
"source.parent",
"target.parent",
"source.owner",
"target.owner",
"allocated_functional_exchanges",
"allocated_functions",
),
DT.LAB: (
"source.parent",
"target.parent",
"source.owner",
"target.owner",
"allocated_functional_exchanges",
"allocated_functions",
),
DT.PAB: (
"source.parent",
"target.parent",
"source.owner",
"target.owner",
"allocated_functional_exchanges",
"allocated_functions",
),
Expand All @@ -62,93 +62,28 @@ def __init__(
self.get_alloc_fex = operator.attrgetter(alloc_fex)
self.get_alloc_functions = operator.attrgetter(fncs)

def get_functions_and_exchanges(
self, comp: common.GenericElement, interface: common.GenericElement
) -> tuple[
list[common.GenericElement],
dict[str, common.GenericElement],
dict[str, common.GenericElement],
]:
"""Return `Function`s, incoming and outgoing
`FunctionalExchange`s for given `Component` and `interface`.
"""
functions, incomings, outgoings = [], {}, {}
alloc_functions = self.get_alloc_functions(comp)
for fex in self.get_alloc_fex(interface):
source = self.get_source(fex)
if source in alloc_functions:
if fex.uuid not in outgoings:
outgoings[fex.uuid] = fex
if source not in functions:
functions.append(source)

target = self.get_target(fex)
if target in alloc_functions:
if fex.uuid not in incomings:
incomings[fex.uuid] = fex
if target not in functions:
functions.append(target)

return functions, incomings, outgoings

def collect_context(
self, comp: common.GenericElement, interface: common.GenericElement
) -> tuple[
dict[str, t.Any],
dict[str, common.GenericElement],
dict[str, common.GenericElement],
]:
functions, incomings, outgoings = self.get_functions_and_exchanges(
comp, interface
)
components = []
for cmp in comp.components:
fncs, _, _ = self.get_functions_and_exchanges(cmp, interface)
functions.extend(fncs)
if fncs:
c, incs, outs = self.collect_context(cmp, interface)
components.append(c)
incomings |= incs
outgoings |= outs

start = {
"element": comp,
"functions": functions,
"components": components,
}
if self.diagram.hide_functions:
start["functions"] = []
incomings = {}
outgoings = {}

return start, incomings, outgoings

def make_ports_and_update_children_size(
def update_children_size(
self,
data: _elkjs.ELKInputChild,
exchanges: t.Sequence[_elkjs.ELKInputEdge],
) -> None:
"""Adjust size of functions and make ports."""
"""Adjust size of functions."""
stack_height: int | float = -makers.NEIGHBOR_VMARGIN
for child in data.children:
inputs, outputs = [], []
obj = self.obj._model.by_uuid(child.id)
if isinstance(obj, cs.Component):
self.make_ports_and_update_children_size(child, exchanges)
self.update_children_size(child, exchanges)
return
port_ids = {p.uuid for p in obj.inputs + obj.outputs}

port_ids = {p.id for p in child.ports}
for ex in exchanges:
source, target = ex.sources[0], ex.targets[0]
if source in port_ids:
outputs.append(source)
elif target in port_ids:
inputs.append(target)

if generic.DIAGRAM_TYPE_TO_CONNECTOR_NAMES[self.diagram.type]:
child.ports = [
makers.make_port(i) for i in set(inputs + outputs)
]

childnum = max(len(inputs), len(outputs))
height = max(
child.height + 2 * makers.LABEL_VPAD,
Expand Down Expand Up @@ -179,8 +114,7 @@ def get_elkdata_for_exchanges(
collector = collector_type(diagram, data, params)
collector.collect()
for comp in data.children:
collector.make_ports_and_update_children_size(comp, data.edges)

collector.update_children_size(comp, data.edges)
return data


Expand All @@ -203,91 +137,100 @@ def __init__(
data: _elkjs.ELKInputData,
params: dict[str, t.Any],
) -> None:
self.left = None
self.right = None
self.left: _elkjs.ELKInputChild | None = None
self.right: _elkjs.ELKInputChild | None = None
self.incoming_edges = {}
self.outgoing_edges = {}

super().__init__(diagram, data, params)

self.get_left_and_right()
if diagram.include_interface:
if diagram.hide_functions:
assert self.left is not None
self.left.children = [] # type: ignore[unreachable]
assert self.right is not None
self.right.children = []
self.incoming_edges = {}
self.outgoing_edges = {}

if diagram.include_interface or diagram.hide_functions:
self.add_interface()

def get_left_and_right(self) -> None:
made_children: set[str] = set()

def get_capella_order(
comp: common.GenericElement, functions: list[common.GenericElement]
) -> list[common.GenericElement]:
alloc_functions = self.get_alloc_functions(comp)
return [fnc for fnc in alloc_functions if fnc in functions]

def make_boxes(cntxt: dict[str, t.Any]) -> _elkjs.ELKInputChild | None:
comp = cntxt["element"]
functions = cntxt["functions"]
if self.diagram.hide_functions:
functions = []

components = cntxt["components"]
if comp.uuid not in made_children:
children = [
makers.make_box(fnc)
for fnc in functions
if fnc in self.get_alloc_functions(comp)
]
for cmp in components:
if child := make_boxes(cmp):
children.append(child)
if children:
layout_options = makers.DEFAULT_LABEL_LAYOUT_OPTIONS
else:
layout_options = makers.CENTRIC_LABEL_LAYOUT_OPTIONS

box = makers.make_box(
comp, no_symbol=True, layout_options=layout_options
try:
self.collect_context()

port_spread = len(self.outgoing_edges) - len(self.incoming_edges)
_port_spread = len(self.incoming_edges) - len(self.outgoing_edges)
if port_spread <= _port_spread:
self.incoming_edges, self.outgoing_edges = (
self.outgoing_edges,
self.incoming_edges,
)
box.children = children
made_children.add(comp.uuid)
return box
return None
self.left, self.right = self.right, self.left

try:
comp = self.get_source(self.obj)
left_context, incs, outs = self.collect_context(comp, self.obj)
inc_port_ids = set(ex.target.uuid for ex in incs.values())
out_port_ids = set(ex.source.uuid for ex in outs.values())
port_spread = len(out_port_ids) - len(inc_port_ids)

_comp = self.get_target(self.obj)
right_context, _, _ = self.collect_context(_comp, self.obj)
_inc_port_ids = set(ex.target.uuid for ex in outs.values())
_out_port_ids = set(ex.source.uuid for ex in incs.values())
_port_spread = len(_out_port_ids) - len(_inc_port_ids)

left_context["functions"] = get_capella_order(
comp, left_context["functions"]
)
right_context["functions"] = get_capella_order(
_comp, right_context["functions"]
)
if port_spread >= _port_spread:
self.incoming_edges = incs
self.outgoing_edges = outs
else:
self.incoming_edges = outs
self.outgoing_edges = incs
left_context, right_context = right_context, left_context

if left_child := make_boxes(left_context):
self.data.children.append(left_child)
self.left = left_child
if right_child := make_boxes(right_context):
self.data.children.append(right_child)
self.right = right_child
assert self.left is not None
self.data.children.append(self.left)
assert self.right is not None
self.data.children.append(self.right)
except AttributeError as error:
logger.exception("Interface collection failed: \n%r", str(error))

def collect_context(self):
self.left = makers.make_box(self.get_source(self.obj), no_symbol=True)
self.right = makers.make_box(self.get_target(self.obj), no_symbol=True)
boxes: dict[str, _elkjs.ELKInputChild] = {
self.left.id: self.left,
self.right.id: self.right,
}
for fex in self.get_alloc_fex(self.obj):
srid = self.make_all_owners(fex.source, boxes)
trid = self.make_all_owners(fex.target, boxes)
if [srid, trid] == [self.right.id, self.left.id]:
self.incoming_edges[fex.uuid] = fex
elif [srid, trid] == [self.left.id, self.right.id]:
self.outgoing_edges[fex.uuid] = fex

if self.left.children:
for label in self.left.labels:
label.layoutOptions = makers.DEFAULT_LABEL_LAYOUT_OPTIONS
if self.right.children:
for label in self.left.labels:
label.layoutOptions = makers.DEFAULT_LABEL_LAYOUT_OPTIONS

def make_all_owners(
self, obj: fa.AbstractFunction, boxes: dict[str, _elkjs.ELKInputChild]
) -> str:
owners: list[fa.AbstractFunction | cs.Component] = []
assert self.right is not None and self.left is not None
root: cs.Component | None = None
for uuid in generic.get_all_owners(obj):
element = self.obj._model.by_uuid(uuid)
if uuid in {self.right.id, self.left.id}:
root = boxes[uuid]
break

owners.append(element)

assert root is not None, f"No root found for {obj._short_repr_()}"
owner_box: common.GenericElement = root
for owner in owners[::-1]:
if isinstance(owner, fa.FunctionPort):
owner_box.ports.append(makers.make_port(owner.uuid))
else:
if owner.uuid in (b.id for b in owner_box.children):
owner_box = boxes[owner.uuid]
continue

box = boxes.setdefault(
owner.uuid, makers.make_box(owner, no_symbol=True)
)
owner_box.children.append(box)
for label in owner_box.labels:
label.layoutOptions = makers.DEFAULT_LABEL_LAYOUT_OPTIONS
owner_box = box
return root.id

def add_interface(self) -> None:
ex_data = generic.ExchangeData(
self.obj,
Expand Down Expand Up @@ -342,48 +285,7 @@ def __init__(
super().__init__(diagram, data, params)

def collect(self) -> None:
functional_exchanges: list[common.GenericElement] = []
all_functions: list[common.GenericElement] = []
made_children: set[str] = {self.obj.uuid}
try:
for interface in self.obj.exchanges:
if self.get_source(interface) == self.obj:
comp = self.get_target(interface)
else:
comp = self.get_source(interface)

functions, inc, outs = self.get_functions_and_exchanges(
self.obj, interface
)
if comp.uuid not in made_children:
children = [makers.make_box(c) for c in functions]
if children:
layout_options = makers.DEFAULT_LABEL_LAYOUT_OPTIONS
else:
layout_options = makers.CENTRIC_LABEL_LAYOUT_OPTIONS

box = makers.make_box(comp, layout_options=layout_options)
box.children = children
self.data.children.append(box)
made_children.add(comp.uuid)

all_functions.extend(functions)
functional_exchanges.extend(inc | outs)

self.data.children[0].children = [
makers.make_box(c)
for c in all_functions
if c in self.obj.functions
]
except AttributeError:
pass

for ex in functional_exchanges:
generic.exchange_data_collector(
generic.ExchangeData(
ex, self.data, set(), is_hierarchical=False
)
)
raise NotImplementedError()


def is_hierarchical(
Expand Down
3 changes: 0 additions & 3 deletions capellambse_context_diagrams/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,6 @@ def _create_diagram(self, params: dict[str, t.Any]) -> cdiagram.Diagram:
if override := params.pop(param_name, False):
setattr(self, param_name, override)

if self.hide_functions:
self.include_interface = True

params["elkdata"] = exchanges.get_elkdata_for_exchanges(
self, exchanges.InterfaceContextCollector, params
)
Expand Down

0 comments on commit c76d2f5

Please sign in to comment.