Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Apply edge direction correction #132

Merged
merged 3 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 91 additions & 26 deletions capellambse_context_diagrams/collectors/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(
self.global_boxes = {self.centerbox.id: self.centerbox}
self.made_boxes = {self.centerbox.id: self.centerbox}
self.boxes_to_delete = {self.centerbox.id}
self.edges: list[fa.AbstractExchange] = []
self.exchanges: list[fa.AbstractExchange] = []
if self.diagram._display_parent_relation:
self.diagram_target_owners = list(
generic.get_all_owners(self.diagram.target)
Expand Down Expand Up @@ -103,22 +103,56 @@ def process_context(self):
generic.move_parent_boxes_to_owner(
owner_boxes, self.diagram.target, self.data
)
generic.move_edges(owner_boxes, self.edges, self.data)
generic.move_edges(owner_boxes, self.exchanges, self.data)

self.centerbox.height = max(
self.centerbox.height, *stack_heights.values()
)

def _process_port_spread(
self,
exs: list[fa.AbstractExchange],
attr: str,
inc: int,
port_spread: dict[str, int],
owners: dict[str, str],
) -> None:
for ex in exs:
elem = getattr(ex, attr).owner
if (owner := owners.get(elem.uuid)) is None:
try:
owner = [
uuid
for uuid in generic.get_all_owners(elem)
if uuid not in self.diagram_target_owners
][-1]
except (IndexError, AttributeError):
owner = elem.uuid
assert owner is not None
owners[elem.uuid] = owner
port_spread.setdefault(owner, 0)
port_spread[owner] += inc

def _process_exchanges(self) -> tuple[
list[common.GenericElement],
list[generic.ExchangeData],
]:
ports = port_collector(self.diagram.target, self.diagram.type)
connections = port_exchange_collector(ports)
self.centerbox.ports = [makers.make_port(uuid) for uuid in connections]
self.edges = list(chain.from_iterable(connections.values()))
inc, out = port_collector(self.diagram.target, self.diagram.type)
inc_c = port_exchange_collector(inc)
out_c = port_exchange_collector(out)
inc_exchanges = list(chain.from_iterable(inc_c.values()))
out_exchanges = list(chain.from_iterable(out_c.values()))
port_spread: dict[str, int] = {}
owners: dict[str, str] = {}
self._process_port_spread(
inc_exchanges, "source", 1, port_spread, owners
)
self._process_port_spread(
out_exchanges, "target", -1, port_spread, owners
)
self.exchanges = inc_exchanges + out_exchanges
ex_datas: list[generic.ExchangeData] = []
for ex in self.edges:
for ex in self.exchanges:
if is_hierarchical := exchanges.is_hierarchical(
ex, self.centerbox
):
Expand All @@ -138,23 +172,39 @@ def _process_exchanges(self) -> tuple[
self.params,
is_hierarchical,
)
generic.exchange_data_collector(ex_data)
src, tgt = generic.exchange_data_collector(ex_data)
src_owner = owners.get(src.owner.uuid, "")
tgt_owner = owners.get(tgt.owner.uuid, "")
is_inc = tgt.parent == self.diagram.target
is_out = src.parent == self.diagram.target
if is_inc and is_out:
pass
elif (is_out and (port_spread.get(tgt_owner, 0) > 0)) or (
is_inc and (port_spread.get(src_owner, 0) <= 0)
):
elkdata.edges[-1].sources = [tgt.uuid]
elkdata.edges[-1].targets = [src.uuid]
ex_datas.append(ex_data)
except AttributeError:
continue

return ports, ex_datas
self.centerbox.ports = [
makers.make_port(uuid) for uuid in {**inc_c, **out_c}
]
return (inc + out), ex_datas

def _process_ports(self, stack_heights: dict[str, float | int]) -> None:
ports, ex_datas = self._process_exchanges()
for port, local_ports, side in port_context_collector(ex_datas, ports):
_, label_height = helpers.get_text_extent(port.name)
for owner, local_ports, side in port_context_collector(
ex_datas, ports
):
_, label_height = helpers.get_text_extent(owner.name)
height = max(
label_height + 2 * makers.LABEL_VPAD,
makers.PORT_PADDING
+ (makers.PORT_SIZE + makers.PORT_PADDING) * len(local_ports),
)
if box := self.global_boxes.get(port.uuid): # type: ignore[assignment]
if box := self.global_boxes.get(owner.uuid): # type: ignore[assignment]
if box is self.centerbox:
continue
box.ports.extend(
Expand All @@ -163,14 +213,14 @@ def _process_ports(self, stack_heights: dict[str, float | int]) -> None:
box.height += height
else:
box = self._make_box(
port,
owner,
height=height,
no_symbol=self.diagram._display_symbols_as_boxes,
)
box.ports = [makers.make_port(j.uuid) for j in local_ports]

if self.diagram._display_parent_relation:
current = port
current = owner
while (
current
and current.uuid not in self.diagram_target_owners
Expand Down Expand Up @@ -236,38 +286,52 @@ def collector(

def port_collector(
target: common.GenericElement | common.ElementList, diagram_type: DT
) -> list[common.GenericElement]:
) -> tuple[list[common.GenericElement], list[common.GenericElement]]:
"""Savely collect ports from `target`."""

def __collect(target):
all_ports: list[common.GenericElement] = []
incoming_ports: list[common.GenericElement] = []
outgoing_ports: list[common.GenericElement] = []
for attr in generic.DIAGRAM_TYPE_TO_CONNECTOR_NAMES[diagram_type]:
try:
ports = getattr(target, attr)
if ports and isinstance(
if not ports or not isinstance(
ports[0],
(fa.FunctionPort, fa.ComponentPort, cs.PhysicalPort),
):
all_ports.extend(ports)
continue
if attr == "inputs":
incoming_ports.extend(ports)
elif attr == "ports":
for port in ports:
if port.direction == "IN":
incoming_ports.append(port)
else:
outgoing_ports.append(port)
else:
outgoing_ports.extend(ports)
except AttributeError:
pass
return all_ports
return incoming_ports, outgoing_ports

if isinstance(target, cabc.Iterable):
assert not isinstance(target, common.GenericElement)
all_ports: list[common.GenericElement] = []
incoming_ports: list[common.GenericElement] = []
outgoing_ports: list[common.GenericElement] = []
for obj in target:
all_ports.extend(__collect(obj))
inc, out = __collect(obj)
incoming_ports.extend(inc)
outgoing_ports.extend(out)
else:
all_ports = __collect(target)
return all_ports
incoming_ports, outgoing_ports = __collect(target)
return incoming_ports, outgoing_ports


def _extract_edges(
obj: common.ElementList[common.GenericElement],
obj: common.GenericElement,
attribute: str,
filter: Filter,
) -> common.ElementList[common.GenericElement] | list:
) -> cabc.Iterable[common.GenericElement]:
return filter(getattr(obj, attribute, []))


Expand Down Expand Up @@ -370,7 +434,8 @@ def derive_from_functions(
assert isinstance(diagram.target, cs.Component)
ports = []
for fnc in diagram.target.allocated_functions:
ports.extend(port_collector(fnc, diagram.type))
inc, out = port_collector(fnc, diagram.type)
ports.extend(inc + out)

context_box_ids = {child.id for child in data.children}
components: dict[str, cs.Component] = {}
Expand Down
8 changes: 4 additions & 4 deletions capellambse_context_diagrams/collectors/exchanges.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
class ExchangeCollector(metaclass=abc.ABCMeta):
"""Base class for context collection on Exchanges."""

intermap: dict[str, DT] = {
intermap: dict[DT, tuple[str, str, str, str]] = {
DT.OAB: ("source", "target", "allocated_interactions", "activities"),
DT.SAB: (
"source.owner",
Expand Down Expand Up @@ -216,9 +216,9 @@ def make_all_owners(
obj: fa.AbstractFunction | fa.FunctionPort,
boxes: dict[str, _elkjs.ELKInputChild],
) -> str:
owners: list[fa.AbstractFunction | cs.Component] = []
owners: list[common.GenericElement] = []
assert self.right is not None and self.left is not None
root: cs.Component | None = None
root: _elkjs.ELKInputChild | None = None
for uuid in generic.get_all_owners(obj):
element = self.obj._model.by_uuid(uuid)
if uuid in {self.right.id, self.left.id}:
Expand All @@ -230,7 +230,7 @@ def make_all_owners(
if root is None:
raise ValueError(f"No root found for {obj._short_repr_()}")

owner_box: common.GenericElement = root
owner_box = root
for owner in reversed(owners):
if isinstance(owner, fa.FunctionPort):
if owner.uuid in (p.id for p in owner_box.ports):
Expand Down
2 changes: 1 addition & 1 deletion capellambse_context_diagrams/collectors/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def move_edges(

def get_all_owners(obj: common.GenericElement) -> cabc.Iterator[str]:
"""Return the UUIDs from all owners of ``obj``."""
current = obj
current: common.GenericElement | None = obj
while current is not None:
yield current.uuid
current = getattr(current, "owner", None)
24 changes: 11 additions & 13 deletions capellambse_context_diagrams/collectors/tree_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import math
import typing as t

from capellambse.model import common
from capellambse.model.crosslayer import information

from .. import _elkjs, context
Expand All @@ -26,7 +27,6 @@
"elk.direction": "DOWN",
"edgeRouting": "ORTHOGONAL",
}
ASSOC_STYLECLASS = "__Association"


class ClassProcessor:
Expand All @@ -41,7 +41,6 @@ def __init__(
self.data_types: set[str] = set()
self.legend_boxes: list[_elkjs.ELKInputChild] = []
self.all_associations = all_associations
self.edge_counter = 0

def __contains__(self, uuid: str) -> bool:
objects = self.data.children + self.data.edges # type: ignore[operator]
Expand All @@ -55,16 +54,12 @@ def process_class(self, cls: ClassInfo, params: dict[str, t.Any]):
edges = [
assoc
for assoc in self.all_associations
if cls.prop in assoc.navigable_members
if cls.prop in list(assoc.navigable_members)
]
if len(edges) == 1:
edge_id = edges[0].uuid
else:
edge_id = f"{ASSOC_STYLECLASS}:{self.edge_counter}"
self.edge_counter += 1
edge_id = edges[0].uuid
if edge_id not in self.made_edges:
self.made_edges.add(edge_id)
text = cls.prop.name
text = cls.prop.name if cls.prop else ""
if cls.multiplicity is None:
start = end = "1"
else:
Expand Down Expand Up @@ -183,7 +178,7 @@ class ClassInfo:

source: information.Class
target: information.Class | None
prop: information.Property
prop: information.Property | None
partition: int
multiplicity: tuple[str, str] | None
generalizes: information.Class | None = None
Expand Down Expand Up @@ -266,7 +261,10 @@ def get_all_classes(
process_property(property)

if super == "ALL" or (super == "ROOT" and partition == 1):
if root.super and not root.super.is_primitive:
if (
isinstance(root.super, information.Class)
and not root.super.is_primitive
):
for prop in root.super.owned_properties:
process_property(
_PropertyInfo(
Expand Down Expand Up @@ -421,7 +419,7 @@ def _get_property_text(prop: information.Property) -> str:


def _get_legend_labels(
obj: information.datatype.Enumeration | information.Class,
obj: common.GenericElement,
) -> cabc.Iterator[makers._LabelBuilder]:
yield {
"text": obj.name,
Expand All @@ -438,7 +436,7 @@ def _get_legend_labels(
elif isinstance(obj, information.Class):
labels = [_get_property_text(prop) for prop in obj.owned_properties]
else:
return
labels = []
layout_options = DATA_TYPE_LABEL_LAYOUT_OPTIONS
for label in labels:
yield {"text": label, "icon": (0, 0), "layout_options": layout_options}
Loading