From 5618d98511c92c37230972d4e2243a7215b84edf Mon Sep 17 00:00:00 2001 From: ewuerger Date: Fri, 26 Jan 2024 14:55:50 +0100 Subject: [PATCH] feat(dataflow-view): First version for Capability --- capellambse_context_diagrams/__init__.py | 12 ++++ .../collectors/dataflow_view.py | 72 +++++++++++++++++++ .../collectors/default.py | 9 ++- .../collectors/makers.py | 4 +- capellambse_context_diagrams/context.py | 57 ++++++++++++++- 5 files changed, 149 insertions(+), 5 deletions(-) create mode 100644 capellambse_context_diagrams/collectors/dataflow_view.py diff --git a/capellambse_context_diagrams/__init__.py b/capellambse_context_diagrams/__init__.py index 00ced73a..9ebbcc04 100644 --- a/capellambse_context_diagrams/__init__.py +++ b/capellambse_context_diagrams/__init__.py @@ -52,6 +52,7 @@ def init() -> None: register_interface_context() register_tree_view() register_realization_view() + register_data_flow_view() # register_functional_context() XXX: Future @@ -201,3 +202,14 @@ def register_realization_view() -> None: "marker-end": "FineArrowMark", "stroke-dasharray": "5", } + + +def register_data_flow_view() -> None: + supported_classes: list[ClassPair] = [ + (oa.OperationalCapability, DiagramType.OAIB, {}), # portless + (ctx.Capability, DiagramType.SDFB, {}), # default + ] + class_: type[common.GenericElement] + for class_, dgcls, default_render_params in supported_classes: + accessor = context.DataFlowAccessor(dgcls.value, default_render_params) + common.set_accessor(class_, "data_flow_view", accessor) diff --git a/capellambse_context_diagrams/collectors/dataflow_view.py b/capellambse_context_diagrams/collectors/dataflow_view.py new file mode 100644 index 00000000..17a440c6 --- /dev/null +++ b/capellambse_context_diagrams/collectors/dataflow_view.py @@ -0,0 +1,72 @@ +# SPDX-FileCopyrightText: 2022 Copyright DB InfraGO AG and the capellambse-context-diagrams contributors +# SPDX-License-Identifier: Apache-2.0 + +"""...""" +from __future__ import annotations + +import collections.abc as cabc +import typing as t + +from capellambse import helpers +from capellambse.model import common +from capellambse.model.crosslayer import fa +from capellambse.model.layers import ctx, oa + +from .. import _elkjs, context +from . import default, generic, makers, portless + + +def collector( + diagram: context.ContextDiagram, + params: dict[str, t.Any], + attribute: str = "involved_functions", + exchange_filter: cabc.Callable[ + [cabc.Iterable[fa.FunctionalExchange]], + cabc.Iterable[fa.FunctionalExchange], + ] = lambda exs: exs, +) -> _elkjs.ELKInputData: + data = makers.make_diagram(diagram) + functions = getattr(diagram.target, attribute) + made_edges: set[str] = set() + for fnc in functions: + data["children"].append(fnc_box := makers.make_box(fnc)) + _ports = default.port_collector(fnc, diagram.type) + connections = default.port_exchange_collector( + _ports, filter=exchange_filter + ) + in_ports: dict[str, fa.FunctionPort] = {} + out_ports: dict[str, fa.FunctionPort] = {} + for edge in connections: + if edge.source.owner == fnc: + out_ports.setdefault(edge.source.uuid, edge.source) + else: + in_ports.setdefault(edge.target.uuid, edge.target) + + fnc_box["ports"] = [ + makers.make_port(i.uuid) for i in (in_ports | out_ports).values() + ] + fnc_box["height"] += ( + makers.PORT_SIZE + 2 * makers.PORT_PADDING + ) * max(len(in_ports), len(out_ports)) + + ex_datas: list[generic.ExchangeData] = [] + for ex in connections: + if ex.uuid in made_edges: + continue + + ex_data = generic.ExchangeData(ex, data, diagram.filters, params) + generic.exchange_data_collector(ex_data) + made_edges.add(ex.uuid) + ex_datas.append(ex_data) + return data + + +def only_involved( + exchanges: cabc.Iterable[fa.FunctionalExchange], + functions: cabc.Iterable[fa.FunctionalExchange], +) -> cabc.Iterable[fa.FunctionalExchange]: + return [ + ex + for ex in exchanges + if ex.source.owner in functions and ex.target.owner in functions + ] diff --git a/capellambse_context_diagrams/collectors/default.py b/capellambse_context_diagrams/collectors/default.py index 10127e41..ec9186c8 100644 --- a/capellambse_context_diagrams/collectors/default.py +++ b/capellambse_context_diagrams/collectors/default.py @@ -27,7 +27,7 @@ def collector( centerbox = data["children"][0] centerbox["ports"] = [makers.make_port(i.uuid) for i in ports] connections = port_exchange_collector(ports) - ex_datas = list[generic.ExchangeData]() + ex_datas: list[generic.ExchangeData] = [] for ex in connections: if is_hierarchical := exchanges.is_hierarchical(ex, centerbox): if not diagram.include_inner_objects: @@ -118,12 +118,17 @@ def __collect(target): def port_exchange_collector( ports: t.Iterable[common.GenericElement], + filter: cabc.Callable[ + [cabc.Iterable[common.GenericElement]], + cabc.Iterable[common.GenericElement], + ] = lambda i: i, ) -> list[common.GenericElement]: """Collect exchanges from `ports` savely.""" edges: list[common.GenericElement] = [] for i in ports: try: - edges.extend(getattr(i, "exchanges")) + filtered = filter(getattr(i, "exchanges")) + edges.extend(filtered) except AttributeError: pass return edges diff --git a/capellambse_context_diagrams/collectors/makers.py b/capellambse_context_diagrams/collectors/makers.py index 605baa42..762e3946 100644 --- a/capellambse_context_diagrams/collectors/makers.py +++ b/capellambse_context_diagrams/collectors/makers.py @@ -135,8 +135,8 @@ def calculate_height_and_width( ) -> tuple[int | float, int | float]: """Calculate the size (width and height) from given labels for a box.""" icon = icon_size + icon_padding * 2 - _height = sum(label["height"] for label in labels) + icon - min_width = max(label["width"] for label in labels) + icon + _height = sum(label["height"] + 2 * LABEL_VPAD for label in labels) + icon + min_width = max(label["width"] + 2 * LABEL_HPAD for label in labels) + icon width = min_width if slim_width else max(width, min_width) return width, max(height, _height) diff --git a/capellambse_context_diagrams/context.py b/capellambse_context_diagrams/context.py index 481a7091..16182a22 100644 --- a/capellambse_context_diagrams/context.py +++ b/capellambse_context_diagrams/context.py @@ -8,6 +8,7 @@ import collections.abc as cabc import copy +import functools import json import logging import typing as t @@ -17,7 +18,13 @@ from capellambse.model import common, diagram, modeltypes from . import _elkjs, filters, serializers, styling -from .collectors import exchanges, get_elkdata, realization_view, tree_view +from .collectors import ( + dataflow_view, + exchanges, + get_elkdata, + realization_view, + tree_view, +) logger = logging.getLogger(__name__) @@ -184,6 +191,27 @@ def __get__( # type: ignore return self._get(obj, RealizationViewDiagram, "{}_realization_view") +class DataFlowAccessor(ContextAccessor): + # pylint: disable=super-init-not-called + def __init__( + self, diagclass: str, render_params: dict[str, t.Any] | None = None + ) -> None: + self._dgcls = diagclass + self._default_render_params = render_params or {} + + def __get__( # type: ignore + self, + obj: common.T | None, + objtype: type | None = None, + ) -> common.Accessor | ContextDiagram: + """Make a DataFlowViewDiagram for the given model object.""" + del objtype + if obj is None: # pragma: no cover + return self + assert isinstance(obj, common.GenericElement) + return self._get(obj, DataFlowViewDiagram, "{}_data_flow_view") + + class ContextDiagram(diagram.AbstractDiagram): """An automatically generated context diagram. @@ -480,6 +508,33 @@ def _add_layer_labels(self, layout: _elkjs.ELKOutputData) -> None: layer["style"] = {"stroke": "grey", "rx": 5, "ry": 5} +class DataFlowViewDiagram(ContextDiagram): + """An automatically generated DataFlowViewDiagram.""" + + def __init__(self, class_: str, obj: common.GenericElement, **kw) -> None: + super().__init__(class_, obj, **kw, display_symbols_as_boxes=True) + + @property + def uuid(self) -> str: # type: ignore + """Returns the UUID of the diagram.""" + return f"{self.target.uuid}_data_flow_view" + + @property + def name(self) -> str: # type: ignore + """Returns the name of the diagram.""" + return f"DatFlow view of {self.target.name}" + + def _create_diagram(self, params: dict[str, t.Any]) -> cdiagram.Diagram: + filter = functools.partial( + dataflow_view.only_involved, + functions=self.target.involved_functions, + ) + params["elkdata"] = dataflow_view.collector( + self, params, exchange_filter=filter + ) + return super()._create_diagram(params) + + def try_to_layout(data: _elkjs.ELKInputData) -> _elkjs.ELKOutputData: """Try calling elkjs, raise a JSONDecodeError if it fails.""" try: