diff --git a/capellambse/decl.py b/capellambse/decl.py index f4e2bb37d..2b0fa9e31 100644 --- a/capellambse/decl.py +++ b/capellambse/decl.py @@ -11,6 +11,7 @@ from __future__ import annotations __all__ = [ + "FindBy", "NewObject", "Promise", "UUIDReference", @@ -26,6 +27,7 @@ import collections.abc as cabc import contextlib import dataclasses +import operator import os import sys import typing as t @@ -109,13 +111,12 @@ def apply( instruction = instructions.popleft() parent = instruction.pop("parent") - if isinstance(parent, UUIDReference): - parent = model.by_uuid(parent.uuid) - elif isinstance(parent, Promise): - if parent in promises: - parent = promises[parent] - else: - deferred[parent].append({"parent": parent, **instruction}) + if isinstance(parent, (Promise, _ObjectFinder)): + try: + parent = _resolve(promises, model, parent) + except _UnresolvablePromise as p: + d = {"parent": parent, **instruction} + deferred[p.args[0]].append(d) continue if not isinstance(parent, capellambse.model.GenericElement): @@ -192,20 +193,22 @@ def _operate_delete( "Cannot delete object:" f" {type(parent).__name__}.{attr} is not model-coupled" ) - uuids = list(t.cast(cabc.Iterable[str], target.by_uuid)) for obj in objs: - if isinstance(obj, UUIDReference): - obj = obj.uuid - if not isinstance(obj, str): - raise TypeError("Values in `delete:*:` must be UUIDs") + if isinstance(obj, Promise): + raise ValueError("Cannot use !promise in `delete:*:`") + if isinstance(obj, str): + obj = UUIDReference(helpers.UUIDString(obj)) + obj = _resolve({}, parent, obj) try: - idx = uuids.index(obj) - except IndexError: - puuid = getattr(parent, "uuid", None) + idx = target.index(obj) + except ValueError: + if hasattr(parent, "_short_repr_"): + p_repr = parent._short_repr_() + else: + p_repr = repr(getattr(parent, "uuid", "")) raise ValueError( - f"No object with UUID {obj!r} in {attr!r} of {puuid!r}" + f"No object {obj._short_repr_()} in {attr!r} of {p_repr}" ) from None - del uuids[idx] del target[idx] return () @@ -217,7 +220,7 @@ def _operate_modify( modifications: dict[str, t.Any], ) -> cabc.Generator[_OperatorResult, t.Any, None]: for attr, value in modifications.items(): - if isinstance(value, (list, Promise, UUIDReference)): + if isinstance(value, (list, Promise, _ObjectFinder)): try: value = _resolve(promises, parent, value) except _UnresolvablePromise as p: @@ -293,7 +296,7 @@ def _operate_sync( def _resolve( promises: dict[Promise, capellambse.ModelObject], - parent: capellambse.ModelObject, + parent: capellambse.ModelObject | capellambse.MelodyModel, value: t.Any, ) -> t.Any: if isinstance(value, Promise): @@ -303,6 +306,8 @@ def _resolve( raise _UnresolvablePromise(value) from None elif isinstance(value, UUIDReference): return parent._model.by_uuid(value.uuid) + elif isinstance(value, FindBy): + return _resolve_findby(parent, None, value) elif isinstance(value, list): for i, v in enumerate(value): newv = _resolve(promises, parent, v) @@ -311,6 +316,67 @@ def _resolve( return value +def _resolve_findby( + parent: capellambse.ModelObject | capellambse.MelodyModel, + attr: str | None, + value: FindBy, +) -> capellambse.ModelObject: + attrs = dict(value.attributes) + typehint = attrs.pop("_type", None) + if not isinstance(typehint, (str, type(None))): + raise TypeError( + f"Expected a string for !find {{_type: ...}}," + f" got {type(typehint)}: {typehint!r}" + ) + if typehint is None: + wanted_types: tuple[type[t.Any], ...] = () + else: + wanted_types = common.find_wrapper(typehint) + if not wanted_types: + raise ValueError(f"Unknown type: {typehint}") + + if isinstance(parent, capellambse.MelodyModel): + candidates = parent.search(*wanted_types) + elif attr is not None: + candidates = getattr(parent, attr) + if wanted_types: + candidates = candidates.filter( + lambda i: isinstance(i, wanted_types) + ) + else: + candidates = parent._model.search() + + if attrs: + if len(attrs) > 1: + expected_values = tuple(attrs.values()) + else: + (expected_values,) = attrs.values() + getter = operator.attrgetter(*attrs) + + def do_filter(obj): + try: + real_values = getter(obj) + except AttributeError: + return False + return real_values == expected_values + + candidates = candidates.filter(do_filter) + + if len(candidates) > 1: + hint = "(Hint: did you mean '_type' instead of 'type'?)\n" * ( + "type" in value.attributes and "_type" not in value.attributes + ) + raise ValueError( + f"Ambiguous match directive: !find {value.attributes!r}\n" + + hint + + f"Found {len(candidates)} matches:\n" + + candidates._short_repr_() + ) + if not candidates: + raise ValueError(f"No object found for !find {value.attributes!r}") + return candidates[0] + + class _UnresolvablePromise(BaseException): pass @@ -351,7 +417,7 @@ def _create_complex_objects( ) for child in objs: if isinstance( - child, (common.GenericElement, list, Promise, UUIDReference) + child, (common.GenericElement, list, Promise, _ObjectFinder) ): try: obj = _resolve(promises, parent, child) @@ -443,6 +509,19 @@ def __post_init__(self) -> None: raise ValueError(f"Malformed `!uuid`: {self.uuid!r}") +@dataclasses.dataclass(frozen=True) +class FindBy: + """Find an object by specific attributes.""" + + attributes: cabc.Mapping[str, t.Any] + + +_ObjectFinder: tuple[type, ...] = ( + UUIDReference, + FindBy, +) + + class YDMDumper(yaml.SafeDumper): """A YAML dumper with extensions for declarative modelling.""" @@ -461,10 +540,16 @@ def represent_newobj(self, data: t.Any) -> yaml.Node: data._kw | {"_type": data._type_hint}, ) + def represent_findby(self, data: t.Any) -> yaml.Node: + assert isinstance(data, FindBy) + attrs = dict(data.attributes) + return self.represent_mapping("!find", attrs) + YDMDumper.add_representer(Promise, YDMDumper.represent_promise) YDMDumper.add_representer(UUIDReference, YDMDumper.represent_uuidref) YDMDumper.add_representer(NewObject, YDMDumper.represent_newobj) +YDMDumper.add_representer(FindBy, YDMDumper.represent_findby) class YDMLoader(yaml.SafeLoader): @@ -496,10 +581,17 @@ def construct_newobj(self, node: yaml.Node) -> NewObject: raise ValueError("!new_object requires a _type key") from None return NewObject(_type, **data) + def construct_findby(self, node: yaml.Node) -> FindBy: + if not isinstance(node, yaml.MappingNode): + raise TypeError("!find only accepts mapping nodes") + data = self.construct_mapping(node) + return FindBy(data) + YDMLoader.add_constructor("!promise", YDMLoader.construct_promise) YDMLoader.add_constructor("!uuid", YDMLoader.construct_uuidref) YDMLoader.add_constructor("!new_object", YDMLoader.construct_newobj) +YDMLoader.add_constructor("!find", YDMLoader.construct_findby) try: diff --git a/capellambse/model/common/__init__.py b/capellambse/model/common/__init__.py index 106640923..dd8ef28a0 100644 --- a/capellambse/model/common/__init__.py +++ b/capellambse/model/common/__init__.py @@ -59,6 +59,23 @@ def build_xtype(class_: type[ModelObject]) -> str: return f"{package}{module}:{clsname}" +def find_wrapper(typehint: str) -> tuple[type[ModelObject], ...]: + """Find the possible wrapper classes for the hinted type. + + The typehint is either a single class name, or a namespace prefix + and class name separated by ``:``. This function searches for all + known wrapper classes that match the given namespace prefix (if any) + and which have the given name, and returns them as a tuple. If no + matching wrapper classes are found, an empty tuple is returned. + """ + return tuple( + v + for i in XTYPE_HANDLERS.values() + for k, v in i.items() + if k.endswith(f":{typehint}") or k == typehint + ) + + def enumliteral( generic_element: GenericElement, attr: str, default: str = "NOT_SET" ) -> AttributeProperty | str: diff --git a/docs/source/start/declarative.rst b/docs/source/start/declarative.rst index 50c754b31..bad3bc469 100644 --- a/docs/source/start/declarative.rst +++ b/docs/source/start/declarative.rst @@ -77,14 +77,43 @@ different operations is applied to it: - ``sync``-ing objects into the model, or - ``delete``-ing one or more children. -Parents can be selected by their universally unique ID (UUID), using the -``!uuid`` YAML tag. The following query selects the root logical function in -our test model: +Selecting a parent +------------------ + +There are a few ways to select a parent object from the model. + +The most straight-forward way is to use the universally unique ID (UUID), using +the ``!uuid`` YAML tag. The following query selects the root logical function +in our test model: .. code-block:: yaml - parent: !uuid f28ec0f8-f3b3-43a0-8af7-79f194b29a2d +A more versatile way involves the ``!find`` YAML tag, which allows specifying a +set of attributes in order to filter down to a single model element. This tag +simply takes a mapping of all the attributes to select for. This usually also +involves the element's type (or class), which is selectable with the ``type`` +attribute: + +.. code-block:: yaml + + - parent: !find + _type: LogicalFunction + name: Root Logical Function + modify: [...] + +The ``!find`` tag also supports dot-notation for filtering on nested +attributes. + +.. code-block:: yaml + + - parent: !find + _type: FunctionOutputPort + name: FOP 1 + owner.name: manage the school + modify: [...] + Extending objects ----------------- diff --git a/tests/test_decl.py b/tests/test_decl.py index db4a24acd..683432c28 100644 --- a/tests/test_decl.py +++ b/tests/test_decl.py @@ -26,6 +26,7 @@ ROOT_COMPONENT = helpers.UUIDString("0d2edb8f-fa34-4e73-89ec-fb9a63001440") ROOT_FUNCTION = helpers.UUIDString("f28ec0f8-f3b3-43a0-8af7-79f194b29a2d") +TEACH_POTIONS_FUNC = helpers.UUIDString("83ba0220-54f2-48f7-bca1-cd87e39639f2") class TestDumpLoad: @@ -85,18 +86,19 @@ def test_decl_errors_on_unknown_operations( @staticmethod @pytest.mark.parametrize( - ["parent_str", "parent_getter"], + ["parent_str"], [ + pytest.param(f"!uuid {TEACH_POTIONS_FUNC}", id="!uuid"), pytest.param( - f"!uuid {ROOT_FUNCTION}", - lambda m: m.by_uuid(ROOT_FUNCTION), - id="!uuid", + "!find {_type: LogicalFunction, name: teach Potions}", + id="!find", ), ], ) def test_decl_finds_parent_to_act_on( - model: capellambse.MelodyModel, parent_str, parent_getter + model: capellambse.MelodyModel, parent_str ) -> None: + parent = model.by_uuid(TEACH_POTIONS_FUNC) funcname = "pass the unit test" yml = f"""\ - parent: {parent_str} @@ -105,13 +107,13 @@ def test_decl_finds_parent_to_act_on( - name: {funcname!r} """ expected_len = len(model.search()) + 1 - assert funcname not in parent_getter(model).functions.by_name + assert funcname not in parent.functions.by_name decl.apply(model, io.StringIO(yml)) actual_len = len(model.search()) assert actual_len == expected_len - assert funcname in parent_getter(model).functions.by_name + assert funcname in parent.functions.by_name @staticmethod def test_decl_creates_each_object_in_a_list(