Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a "sync" operation for declarative YAML #387

Merged
13 commits merged into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 198 additions & 18 deletions capellambse/decl.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from __future__ import annotations

__all__ = [
"FindBy",
"NewObject",
"Promise",
"UUIDReference",
"UnfulfilledPromisesError",
Expand All @@ -25,16 +27,17 @@
import collections.abc as cabc
import contextlib
import dataclasses
import operator
import os
import sys
import typing as t
import warnings

import yaml

import capellambse
from capellambse import helpers
from capellambse.model import common
from capellambse.model import new_object as NewObject

FileOrPath = t.Union[t.IO[str], str, os.PathLike[t.Any]]
_FutureAction = dict[str, t.Any]
Expand Down Expand Up @@ -106,9 +109,22 @@ def apply(

while instructions:
instruction = instructions.popleft()

parent = instruction.pop("parent")
if isinstance(parent, UUIDReference):
parent = model.by_uuid(parent.uuid)
if isinstance(parent, (Promise, _ObjectFinder)):
try:
parent = _resolve(promises, model, parent)
except _UnresolvablePromise as p:
d = {"parent": parent, **instruction}
deferred[p.args[0]].append(d)
continue

if not isinstance(parent, capellambse.model.GenericElement):
raise TypeError(
"Expected a model object as parent, found "
f"{type(parent).__name__}"
)

for op_type, apply_op in _OPERATIONS.items():
try:
op = instruction.pop(op_type)
Expand Down Expand Up @@ -137,7 +153,6 @@ def _operate_create(
parent: capellambse.ModelObject,
creations: dict[str, t.Any],
) -> cabc.Generator[_OperatorResult, t.Any, None]:
warnings.warn("Use 'extend' instead of 'create' in declarative YAML")
yield from _operate_extend(promises, parent, creations)


Expand Down Expand Up @@ -178,20 +193,22 @@ def _operate_delete(
"Cannot delete object:"
f" {type(parent).__name__}.{attr} is not model-coupled"
)
uuids = list(t.cast(cabc.Iterable[str], target.by_uuid))
for obj in objs:
if isinstance(obj, UUIDReference):
obj = obj.uuid
if not isinstance(obj, str):
raise TypeError("Values in `delete:*:` must be UUIDs")
if isinstance(obj, Promise):
raise ValueError("Cannot use !promise in `delete:*:`")
if isinstance(obj, str):
obj = UUIDReference(helpers.UUIDString(obj))
obj = _resolve({}, parent, obj)
try:
idx = uuids.index(obj)
except IndexError:
puuid = getattr(parent, "uuid", None)
idx = target.index(obj)
except ValueError:
if hasattr(parent, "_short_repr_"):
p_repr = parent._short_repr_()
else:
p_repr = repr(getattr(parent, "uuid", "<unknown>"))
raise ValueError(
f"No object with UUID {obj!r} in {attr!r} of {puuid!r}"
f"No object {obj._short_repr_()} in {attr!r} of {p_repr}"
) from None
del uuids[idx]
del target[idx]

return ()
Expand All @@ -203,15 +220,15 @@ def _operate_modify(
modifications: dict[str, t.Any],
) -> cabc.Generator[_OperatorResult, t.Any, None]:
for attr, value in modifications.items():
if isinstance(value, (list, Promise, UUIDReference)):
if isinstance(value, (list, Promise, _ObjectFinder)):
try:
value = _resolve(promises, parent, value)
except _UnresolvablePromise as p:
yield p.args[0], {"parent": parent, "modify": {attr: value}}
continue

if isinstance(value, list):
delattr(parent, attr)
getattr(parent, attr).clear()
yield from _create_complex_objects(promises, parent, attr, value)
elif isinstance(value, dict):
obj = getattr(parent, attr)
Expand All @@ -221,9 +238,55 @@ def _operate_modify(
setattr(parent, attr, value)


def _resolve(
def _operate_sync(
promises: dict[Promise, capellambse.ModelObject],
parent: capellambse.ModelObject,
modifications: dict[str, t.Any],
) -> cabc.Generator[_OperatorResult, t.Any, None]:
for attr, value in modifications.items():
if not isinstance(value, cabc.Iterable):
raise TypeError("values below `extend:*:` must be lists")

for obj in value:
try:
find_args = obj["find"]
except KeyError:
raise ValueError(
"Expected `find` key in sync object"
) from None

try:
candidate = _resolve_findby(parent, attr, FindBy(find_args))
except _NoObjectFoundError:
candidate = None

if candidate is not None:
if sync := obj.pop("sync", None):
yield from _operate_sync(promises, candidate, sync)
if mods := obj.pop("set", None):
yield from _operate_modify(promises, candidate, mods)
if ext := obj.pop("extend", None):
yield from _operate_extend(promises, candidate, ext)
promise: str | Promise | None = obj.get("promise_id")
if promise is not None:
if isinstance(promise, str):
promise = Promise(promise)
yield (promise, candidate)
else:
newobj_props = (
find_args | obj.pop("set", {}) | obj.pop("extend", {})
)
if "promise_id" in obj:
newobj_props["promise_id"] = obj.pop("promise_id")
yield from _create_complex_objects(
promises, parent, attr, [newobj_props]
)
yield from _operate_sync(promises, parent, {attr: [obj]})


def _resolve(
promises: dict[Promise, capellambse.ModelObject],
parent: capellambse.ModelObject | capellambse.MelodyModel,
value: t.Any,
) -> t.Any:
if isinstance(value, Promise):
Expand All @@ -233,6 +296,8 @@ def _resolve(
raise _UnresolvablePromise(value) from None
elif isinstance(value, UUIDReference):
return parent._model.by_uuid(value.uuid)
elif isinstance(value, FindBy):
return _resolve_findby(parent, None, value)
elif isinstance(value, list):
for i, v in enumerate(value):
newv = _resolve(promises, parent, v)
Expand All @@ -241,15 +306,83 @@ def _resolve(
return value


def _resolve_findby(
parent: capellambse.ModelObject | capellambse.MelodyModel,
attr: str | None,
value: FindBy,
) -> capellambse.ModelObject:
attrs = dict(value.attributes)
typehint = attrs.pop("_type", None)
if not isinstance(typehint, (str, type(None))):
raise TypeError(
f"Expected a string for !find {{_type: ...}},"
f" got {type(typehint)}: {typehint!r}"
)
if typehint is None:
wanted_types: tuple[type[t.Any], ...] = ()
else:
wanted_types = common.find_wrapper(typehint)
if not wanted_types:
raise ValueError(f"Unknown type: {typehint}")

if isinstance(parent, capellambse.MelodyModel):
candidates = parent.search(*wanted_types)
elif attr is not None:
candidates = getattr(parent, attr)
if wanted_types:
candidates = candidates.filter(
lambda i: isinstance(i, wanted_types)
)
else:
candidates = parent._model.search()

if attrs:
if len(attrs) > 1:
expected_values = tuple(attrs.values())
else:
(expected_values,) = attrs.values()
getter = operator.attrgetter(*attrs)

def do_filter(obj):
try:
real_values = getter(obj)
except AttributeError:
return False
return real_values == expected_values

candidates = candidates.filter(do_filter)

if len(candidates) > 1:
hint = "(Hint: did you mean '_type' instead of 'type'?)\n" * (
"type" in value.attributes and "_type" not in value.attributes
)
raise ValueError(
f"Ambiguous match directive: !find {value.attributes!r}\n"
+ hint
+ f"Found {len(candidates)} matches:\n"
+ candidates._short_repr_()
)
if not candidates:
raise _NoObjectFoundError(
f"No object found for !find {value.attributes!r}"
)
return candidates[0]


class _UnresolvablePromise(BaseException):
pass


class _NoObjectFoundError(ValueError):
pass


_OPERATIONS = collections.OrderedDict(
(
("create", _operate_create),
("extend", _operate_extend),
("modify", _operate_modify),
("sync", _operate_sync),
("delete", _operate_delete),
)
)
Expand Down Expand Up @@ -280,7 +413,7 @@ def _create_complex_objects(
)
for child in objs:
if isinstance(
child, (common.GenericElement, list, Promise, UUIDReference)
child, (common.GenericElement, list, Promise, _ObjectFinder)
):
try:
obj = _resolve(promises, parent, child)
Expand Down Expand Up @@ -372,6 +505,19 @@ def __post_init__(self) -> None:
raise ValueError(f"Malformed `!uuid`: {self.uuid!r}")


@dataclasses.dataclass(frozen=True)
class FindBy:
"""Find an object by specific attributes."""

attributes: cabc.Mapping[str, t.Any]


_ObjectFinder: tuple[type, ...] = (
UUIDReference,
FindBy,
)


class YDMDumper(yaml.SafeDumper):
"""A YAML dumper with extensions for declarative modelling."""

Expand All @@ -383,9 +529,25 @@ def represent_uuidref(self, data: t.Any) -> yaml.Node:
assert isinstance(data, UUIDReference)
return self.represent_scalar("!uuid", data.uuid)

def represent_newobj(self, data: t.Any) -> yaml.Node:
assert isinstance(data, NewObject)
attrs = dict(data._kw)
if len(data._type_hint) > 1:
raise TypeError("Cannot use more than one type hint")
if len(data._type_hint) == 1:
attrs["_type"] = data._type_hint[0]
return self.represent_mapping("!new_object", attrs)

def represent_findby(self, data: t.Any) -> yaml.Node:
assert isinstance(data, FindBy)
attrs = dict(data.attributes)
return self.represent_mapping("!find", attrs)


YDMDumper.add_representer(Promise, YDMDumper.represent_promise)
YDMDumper.add_representer(UUIDReference, YDMDumper.represent_uuidref)
YDMDumper.add_representer(NewObject, YDMDumper.represent_newobj)
YDMDumper.add_representer(FindBy, YDMDumper.represent_findby)


class YDMLoader(yaml.SafeLoader):
Expand All @@ -407,9 +569,27 @@ def construct_uuidref(self, node: yaml.Node) -> UUIDReference:
raise ValueError(f"Not a well-formed UUID string: {data}")
return UUIDReference(data)

def construct_newobj(self, node: yaml.Node) -> NewObject:
if not isinstance(node, yaml.MappingNode):
raise TypeError("!new_object only accepts mapping nodes")
data = self.construct_mapping(node)
try:
_type = data.pop("_type")
except KeyError:
raise ValueError("!new_object requires a _type key") from None
return NewObject(_type, **data)

def construct_findby(self, node: yaml.Node) -> FindBy:
if not isinstance(node, yaml.MappingNode):
raise TypeError("!find only accepts mapping nodes")
data = self.construct_mapping(node)
return FindBy(data)


YDMLoader.add_constructor("!promise", YDMLoader.construct_promise)
YDMLoader.add_constructor("!uuid", YDMLoader.construct_uuidref)
YDMLoader.add_constructor("!new_object", YDMLoader.construct_newobj)
YDMLoader.add_constructor("!find", YDMLoader.construct_findby)


try:
Expand Down
17 changes: 17 additions & 0 deletions capellambse/model/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,23 @@ def build_xtype(class_: type[ModelObject]) -> str:
return f"{package}{module}:{clsname}"


def find_wrapper(typehint: str) -> tuple[type[ModelObject], ...]:
"""Find the possible wrapper classes for the hinted type.

The typehint is either a single class name, or a namespace prefix
and class name separated by ``:``. This function searches for all
known wrapper classes that match the given namespace prefix (if any)
and which have the given name, and returns them as a tuple. If no
matching wrapper classes are found, an empty tuple is returned.
"""
return tuple(
v
for i in XTYPE_HANDLERS.values()
for k, v in i.items()
if k.endswith(f":{typehint}") or k == typehint
)


def enumliteral(
generic_element: GenericElement, attr: str, default: str = "NOT_SET"
) -> AttributeProperty | str:
Expand Down
23 changes: 22 additions & 1 deletion capellambse/model/common/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -1811,7 +1811,28 @@ def delete(
elmlist: ElementListCouplingMixin,
obj: element.ModelObject,
) -> None:
raise NotImplementedError("NYI")
assert obj._model is elmlist._model
model = obj._model
all_elements = list(model._loader.iterdescendants_xt(obj._element)) + [
obj._element
]
with contextlib.ExitStack() as stack:
for elm in all_elements:
if elm.get("id") is None:
continue

obj = element.GenericElement.from_model(model, elm)
for ref, attr, _ in model.find_references(obj):
acc = getattr(type(ref), attr)
if acc is self or not isinstance(acc, WritableAccessor):
continue
stack.enter_context(acc.purge_references(ref, obj))

elm = obj._element
parent = elm.getparent()
assert parent is not None
model._loader.idcache_remove(elm)
parent.remove(elm)

@contextlib.contextmanager
def purge_references(
Expand Down
Loading