Skip to content

Commit

Permalink
refactor: Refactor Dataflow collector (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
huyenngn authored Jun 11, 2024
1 parent 8ee1a41 commit add9739
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 82 deletions.
125 changes: 44 additions & 81 deletions capellambse_context_diagrams/collectors/dataflow_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@
from .. import _elkjs, context
from . import default, generic, makers, portless

COLLECTOR_PARAMS: dict[modeltypes.DiagramType, dict[str, t.Any]] = {
modeltypes.DiagramType.OAIB: {"attribute": "involved_activities"},
modeltypes.DiagramType.SDFB: {
"attribute": "involved_functions",
"filter_attrs": ("source.owner", "target.owner"),
"port_collector": default.port_collector,
},
}


def only_involved(
exchanges: cabc.Iterable[fa.FunctionalExchange],
Expand Down Expand Up @@ -47,61 +56,12 @@ def collector(
] = only_involved,
) -> _elkjs.ELKInputData:
"""Main collector that calls either default or portless collectors."""
return COLLECTORS[diagram.type](diagram, params, exchange_filter)


def collector_portless(
diagram: context.ContextDiagram,
params: dict[str, t.Any],
exchange_filter: cabc.Callable[
[
cabc.Iterable[fa.FunctionalExchange],
cabc.Iterable[fa.FunctionalExchange],
tuple[str, str],
],
cabc.Iterable[fa.FunctionalExchange],
],
attribute: str = "involved_activities",
) -> _elkjs.ELKInputData:
"""Collector function for the operational layer."""
data = makers.make_diagram(diagram)
activities = getattr(diagram.target, attribute)
filter = functools.partial(
exchange_filter,
functions=activities,
attributes=("source", "target"),
return _collect_data(
diagram, params, exchange_filter, **COLLECTOR_PARAMS[diagram.type]
)
made_edges: set[str] = set()
for act in activities:
data.children.append(act_box := makers.make_box(act))
connections = list(portless.get_exchanges(act, filter=filter))

in_act: dict[str, oa.OperationalActivity] = {}
out_act: dict[str, oa.OperationalActivity] = {}
for edge in connections:
if edge.source == act:
out_act.setdefault(edge.source.uuid, edge.source)
else:
in_act.setdefault(edge.target.uuid, edge.target)

act_box.height += (makers.PORT_SIZE + 2 * makers.PORT_PADDING) * max(
len(in_act), len(out_act)
)

ex_datas: list[generic.ExchangeData] = []
for ex in connections:
if ex.uuid in made_edges:
continue

ex_data = generic.ExchangeData(ex, data, diagram.filters, params)
generic.exchange_data_collector(ex_data)
made_edges.add(ex.uuid)
ex_datas.append(ex_data)

return data


def collector_default(
def _collect_data(
diagram: context.ContextDiagram,
params: dict[str, t.Any],
exchange_filter: cabc.Callable[
Expand All @@ -112,36 +72,45 @@ def collector_default(
],
cabc.Iterable[fa.FunctionalExchange],
],
attribute: str = "involved_functions",
attribute: str,
filter_attrs: tuple[str, str] = ("source", "target"),
port_collector: t.Optional[cabc.Callable] = None,
) -> _elkjs.ELKInputData:
"""Collector for all other layers than operational architecture."""
data = makers.make_diagram(diagram)
functions = getattr(diagram.target, attribute)
elements = getattr(diagram.target, attribute)
src_attr, trg_attr = filter_attrs
filter = functools.partial(
exchange_filter,
functions=functions,
attributes=("source.owner", "target.owner"),
functions=elements,
attributes=(src_attr, trg_attr),
)
made_edges: set[str] = set()
for fnc in functions:
data.children.append(fnc_box := makers.make_box(fnc))
_ports = default.port_collector(fnc, diagram.type)
connections = default.port_exchange_collector(_ports, filter=filter)
in_ports: dict[str, fa.FunctionPort] = {}
out_ports: dict[str, fa.FunctionPort] = {}
for edge in (edges := list(chain.from_iterable(connections.values()))):
if edge.source.owner == fnc:
out_ports.setdefault(edge.source.uuid, edge.source)
for elem in elements:
data.children.append(box := makers.make_box(elem))
if port_collector:
_ports = port_collector(elem, diagram.type)
connections = default.port_exchange_collector(
_ports, filter=filter
)
edges = list(chain.from_iterable(connections.values()))
else:
edges = list(portless.get_exchanges(elem, filter=filter))
in_elems: dict[str, fa.FunctionPort | oa.OperationalActivity] = {}
out_elems: dict[str, fa.FunctionPort | oa.OperationalActivity] = {}
for edge in edges:
if operator.attrgetter(src_attr) == elem:
out_elems.setdefault(edge.source.uuid, edge.source)
else:
in_ports.setdefault(edge.target.uuid, edge.target)

fnc_box.ports = [
makers.make_port(i.uuid) for i in (in_ports | out_ports).values()
]
fnc_box.height += (makers.PORT_SIZE + 2 * makers.PORT_PADDING) * max(
len(in_ports), len(out_ports)
in_elems.setdefault(edge.target.uuid, edge.target)

if port_collector:
box.ports = [
makers.make_port(i.uuid)
for i in (in_elems | out_elems).values()
]
box.height += (makers.PORT_SIZE + 2 * makers.PORT_PADDING) * max(
len(in_elems), len(out_elems)
)

ex_datas: list[generic.ExchangeData] = []
for ex in edges:
if ex.uuid in made_edges:
Expand All @@ -151,11 +120,5 @@ def collector_default(
generic.exchange_data_collector(ex_data)
made_edges.add(ex.uuid)
ex_datas.append(ex_data)
return data


COLLECTORS: dict[modeltypes.DiagramType, cabc.Callable] = {
modeltypes.DiagramType.OAIB: collector_portless,
modeltypes.DiagramType.SDFB: collector_default,
}
"""Collector registry."""
return data
2 changes: 1 addition & 1 deletion docs/realization_view.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
~ SPDX-License-Identifier: Apache-2.0
-->

# Tree View Diagram
# Realization View Diagram

With release
[`v0.5.42`](https://github.com/DSD-DBS/py-capellambse/releases/tag/v0.5.42) of
Expand Down

0 comments on commit add9739

Please sign in to comment.