Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ewuerger committed May 7, 2024
1 parent c8df194 commit b3c6e96
Show file tree
Hide file tree
Showing 8 changed files with 1,672 additions and 250 deletions.
25 changes: 19 additions & 6 deletions capellambse_context_diagrams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ def register_classes() -> None:
"""Add the `context_diagram` property to the relevant model objects."""
supported_classes: list[SupportedClass] = [
(oa.Entity, DiagramType.OAB, {}),
(oa.OperationalActivity, DiagramType.OAB, {}),
(
oa.OperationalActivity,
DiagramType.OAB,
{"display_parent_relation": True},
),
(oa.OperationalCapability, DiagramType.OCB, {}),
(ctx.Mission, DiagramType.MCB, {}),
(ctx.Capability, DiagramType.MCB, {"display_symbols_as_boxes": False}),
Expand All @@ -69,32 +73,41 @@ def register_classes() -> None:
DiagramType.SAB,
{
"display_symbols_as_boxes": True,
"display_parent_relation": True,
"include_inner_objects": True,
"render_styles": styling.BLUE_ACTOR_FNCS,
},
),
(
ctx.SystemFunction,
DiagramType.SDFB,
DiagramType.SAB,
{"render_styles": styling.BLUE_ACTOR_FNCS},
),
(
la.LogicalComponent,
DiagramType.LAB,
{"render_styles": styling.BLUE_ACTOR_FNCS},
{
"display_parent_relation": True,
"include_inner_objects": True,
"render_styles": styling.BLUE_ACTOR_FNCS,
},
),
(
la.LogicalFunction,
DiagramType.LDFB,
DiagramType.LAB,
{"render_styles": styling.BLUE_ACTOR_FNCS},
),
(
pa.PhysicalComponent,
DiagramType.PAB,
{"render_styles": styling.BLUE_ACTOR_FNCS},
{
"display_parent_relation": True,
"render_styles": styling.BLUE_ACTOR_FNCS,
},
),
(
pa.PhysicalFunction,
DiagramType.PDFB,
DiagramType.PAB,
{"render_styles": styling.BLUE_ACTOR_FNCS},
),
]
Expand Down
5 changes: 3 additions & 2 deletions capellambse_context_diagrams/collectors/dataflow_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import functools
import operator
import typing as t
from itertools import chain

from capellambse.model import modeltypes
from capellambse.model.crosslayer import fa
Expand Down Expand Up @@ -128,7 +129,7 @@ def collector_default(
connections = default.port_exchange_collector(_ports, filter=filter)
in_ports: dict[str, fa.FunctionPort] = {}
out_ports: dict[str, fa.FunctionPort] = {}
for edge in connections:
for edge in (edges := list(chain.from_iterable(connections.values()))):
if edge.source.owner == fnc:
out_ports.setdefault(edge.source.uuid, edge.source)
else:
Expand All @@ -142,7 +143,7 @@ def collector_default(
) * max(len(in_ports), len(out_ports))

ex_datas: list[generic.ExchangeData] = []
for ex in connections:
for ex in edges:
if ex.uuid in made_edges:
continue

Expand Down
108 changes: 47 additions & 61 deletions capellambse_context_diagrams/collectors/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import collections.abc as cabc
import typing as t
from itertools import chain

from capellambse import helpers
from capellambse.model import common
Expand All @@ -22,13 +23,22 @@ def collector(
diagram: context.ContextDiagram, params: dict[str, t.Any] | None = None
) -> _elkjs.ELKInputData:
"""Collect context data from ports of centric box."""
diagram.display_parent_relation = (params or {}).pop(
"display_parent_relation", diagram.display_parent_relation
)
diagram.include_inner_objects = (params or {}).pop(
"include_inner_objects", diagram.include_inner_objects
)
data = generic.collector(diagram, no_symbol=True)
ports = port_collector(diagram.target, diagram.type)
centerbox = data["children"][0]
centerbox["ports"] = [makers.make_port(i.uuid) for i in ports]
connections = port_exchange_collector(ports)
centerbox["ports"] = [
makers.make_port(uuid) for uuid, edges in connections.items() if edges
]
ex_datas: list[generic.ExchangeData] = []
for ex in connections:
edges: common.ElementList[fa.AbstractExchange]
for ex in (edges := list(chain.from_iterable(connections.values()))):
if is_hierarchical := exchanges.is_hierarchical(ex, centerbox):
if not diagram.include_inner_objects:
continue
Expand All @@ -46,21 +56,22 @@ def collector(
continue

global_boxes = {centerbox["id"]: centerbox}
if diagram.display_parent_relation:
made_boxes = {centerbox["id"]: centerbox}
if diagram.display_parent_relation and diagram.target.owner:
box = makers.make_box(
diagram.target.parent,
diagram.target.owner,
no_symbol=diagram.display_symbols_as_boxes,
layout_options=makers.DEFAULT_LABEL_LAYOUT_OPTIONS,
)
box["children"] = [centerbox]
del data["children"][0]
global_boxes[diagram.target.parent.uuid] = box
global_boxes[diagram.target.owner.uuid] = box
made_boxes[diagram.target.owner.uuid] = box

stack_heights: dict[str, float | int] = {
"input": -makers.NEIGHBOR_VMARGIN,
"output": -makers.NEIGHBOR_VMARGIN,
}
child_boxes: list[_elkjs.ELKInputChild] = []
for child, local_ports, side in port_context_collector(ex_datas, ports):
_, label_height = helpers.get_text_extent(child.name)
height = max(
Expand All @@ -82,71 +93,47 @@ def collector(
no_symbol=diagram.display_symbols_as_boxes,
)
box["ports"] = [makers.make_port(j.uuid) for j in local_ports]
if child.parent.uuid == centerbox["id"]:
child_boxes.append(box)
else:
global_boxes[child.uuid] = box

if diagram.display_parent_relation:
if child == diagram.target.parent:
_move_edge_to_local_edges(
box, connections, local_ports, diagram, data
)
elif child.parent == diagram.target.parent:
parent_box = global_boxes[child.parent.uuid]
parent_box.setdefault("children", []).append(
global_boxes.pop(child.uuid)
)
global_boxes[child.uuid] = box
made_boxes[child.uuid] = box

if diagram.display_parent_relation and child.owner is not None:
child_box = global_boxes.pop(child.uuid)
for uuid in generic.get_all_owners(child):
owner = diagram.target._model.by_uuid(uuid)
assert owner is not None
if not (parent_box := global_boxes.get(uuid)):
parent_box = makers.make_box(
owner,
no_symbol=diagram.display_symbols_as_boxes,
)
global_boxes[uuid] = parent_box
made_boxes[uuid] = parent_box

parent_box.setdefault("children", []).append(child_box)
for label in parent_box["labels"]:
label["layoutOptions"] = (
makers.CENTRIC_LABEL_LAYOUT_OPTIONS
makers.DEFAULT_LABEL_LAYOUT_OPTIONS
)

_move_edge_to_local_edges(
parent_box, connections, local_ports, diagram, data
)
child_box = parent_box

stack_heights[side] += makers.NEIGHBOR_VMARGIN + height

del global_boxes[centerbox["id"]]
data["children"].extend(global_boxes.values())
if child_boxes:
centerbox["children"] = child_boxes
centerbox["width"] = makers.EOI_WIDTH
for label in centerbox.get("labels", []):
label.setdefault("layoutOptions", {}).update(
makers.DEFAULT_LABEL_LAYOUT_OPTIONS
)
if diagram.display_parent_relation:
owner_boxes: dict[str, _elkjs.ELKInputChild] = {
uuid: box
for uuid, box in made_boxes.items()
if box.get("children")
}
generic.move_parent_boxes_to_owner(owner_boxes, diagram.target, data)
generic.move_edges(owner_boxes, edges, data)

centerbox["height"] = max(centerbox["height"], *stack_heights.values())
return data


def _move_edge_to_local_edges(
box: _elkjs.ELKInputChild,
connections: list[common.GenericElement],
local_ports: list[common.GenericElement],
diagram: context.ContextDiagram,
data: _elkjs.ELKInputData,
) -> None:
edges_to_remove: list[str] = []
for c in connections:
if (
c.target in local_ports
and c.source in diagram.target.ports
or c.source in local_ports
and c.target in diagram.target.ports
):
for edge in data["edges"]:
if edge["id"] == c.uuid:
box.setdefault("edges", []).append(edge)
edges_to_remove.append(edge["id"])

data["edges"] = [
e for e in data["edges"] if e["id"] not in edges_to_remove
]


def port_collector(
target: common.GenericElement | common.ElementList, diagram_type: DT
) -> list[common.GenericElement]:
Expand Down Expand Up @@ -182,13 +169,12 @@ def port_exchange_collector(
[cabc.Iterable[common.GenericElement]],
cabc.Iterable[common.GenericElement],
] = lambda i: i,
) -> list[common.GenericElement]:
) -> dict[str, common.ElementList[fa.AbstractExchange]]:
"""Collect exchanges from `ports` savely."""
edges: list[common.GenericElement] = []
edges: dict[str, common.ElementList[fa.AbstractExchange]] = {}
for i in ports:
try:
filtered = filter(getattr(i, "exchanges"))
edges.extend(filtered)
edges[i.uuid] = filter(getattr(i, "exchanges"))
except AttributeError:
pass
return edges
Expand Down
81 changes: 80 additions & 1 deletion capellambse_context_diagrams/collectors/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import logging
import typing as t

from capellambse.model import common
from capellambse.model import common, layers
from capellambse.model.crosslayer import interaction
from capellambse.model.modeltypes import DiagramType as DT

Expand Down Expand Up @@ -43,6 +43,12 @@
"""Default size of marker-ends in pixels."""
MARKER_PADDING = makers.PORT_PADDING
"""Default padding of markers in pixels."""
PackageTypes: tuple[type[common.GenericElement], ...] = (
layers.oa.EntityPkg,
layers.la.LogicalComponentPkg,
layers.ctx.SystemComponentPkg,
layers.pa.PhysicalComponentPkg,
)


def collector(
Expand Down Expand Up @@ -185,3 +191,76 @@ def collect_label(obj: common.GenericElement) -> str | None:
elif isinstance(obj, interaction.AbstractCapabilityInclude):
return "« i »"
return "" if obj.name.startswith("(Unnamed") else obj.name


def move_parent_boxes_to_owner(
boxes: dict[str, _elkjs.ELKInputChild],
obj: common.GenericElement,
data: _elkjs.ELKInputData,
filter_types: tuple[type, ...] = PackageTypes,
) -> None:
"""Move boxes to their owner box."""
boxes_to_remove: list[str] = []
for child in data["children"]:
if not child.get("children"):
continue

owner = obj._model.by_uuid(child["id"])
if (
isinstance(owner, filter_types)
or not (oowner := owner.owner)
or isinstance(oowner, filter_types)
or not (oowner_box := boxes.get(oowner.uuid))
):
continue

oowner_box.setdefault("children", []).append(child)
boxes_to_remove.append(child["id"])

data["children"] = [
b for b in data["children"] if b["id"] not in boxes_to_remove
]


def move_edges(
boxes: dict[str, _elkjs.ELKInputChild],
connections: list[common.GenericElement],
data: _elkjs.ELKInputData,
) -> None:
"""Move edges to boxes."""
edges_to_remove: list[str] = []
for c in connections:
source_owner_uuids = get_all_owners(c.source)
target_owner_uuids = get_all_owners(c.target)
common_owner_uuid = None
for owner in source_owner_uuids:
if owner in set(target_owner_uuids):
common_owner_uuid = owner
break

if not common_owner_uuid or not (
owner_box := boxes.get(common_owner_uuid)
):
continue

for edge in data["edges"]:
if edge["id"] == c.uuid:
owner_box.setdefault("edges", []).append(edge)
edges_to_remove.append(edge["id"])

data["edges"] = [
e for e in data["edges"] if e["id"] not in edges_to_remove
]


def get_all_owners(obj: common.GenericElement) -> list[str]:
"""Return the UUIDs from all owners of ``obj``."""
owners: list[str] = []
current = obj
while current is not None and not isinstance(current, PackageTypes):
owners.append(current.uuid)
try:
current = current.owner
except AttributeError:
break
return owners
Loading

0 comments on commit b3c6e96

Please sign in to comment.