diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 81ce8ed11..66d976532 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -106,6 +106,7 @@ repos: additional_dependencies: - mypypp==0.1.1 + - awesomeversion==24.2.0 - click==8.1.7 - diskcache==5.0 - jinja2==3.1.3 diff --git a/capellambse/decl.py b/capellambse/decl.py index 05e625d40..276a55296 100644 --- a/capellambse/decl.py +++ b/capellambse/decl.py @@ -21,17 +21,28 @@ "apply", "dump", "load", + # Metadata handling + "Metadata", + "ModelMetadata", + "WriterMetadata", + "load_with_metadata", ] import collections import collections.abc as cabc import contextlib import dataclasses +import importlib.metadata as imm +import logging import operator import os +import pathlib +import re import sys import typing as t +import awesomeversion as av +import typing_extensions as te import yaml import capellambse @@ -45,11 +56,66 @@ "Promise", capellambse.ModelObject | _FutureAction, ] - - -def dump(instructions: cabc.Sequence[cabc.Mapping[str, t.Any]]) -> str: - """Dump an instruction stream to YAML.""" - return yaml.dump(instructions, Dumper=YDMDumper) +logger = logging.getLogger(__name__) + + +class WriterMetadata(t.TypedDict): + capellambse: str + generator: te.NotRequired[str] + + +class ModelMetadata(t.TypedDict): + url: str + revision: te.NotRequired[str] + entrypoint: str + + +class Metadata(t.TypedDict, total=False): + written_by: WriterMetadata + model: ModelMetadata + + +@t.overload +def dump( + instructions: cabc.Sequence[cabc.Mapping[str, t.Any]], + *, + metadata: Metadata | None = None, +) -> str: ... +@t.overload +def dump( + instructions: cabc.Sequence[cabc.Mapping[str, t.Any]], + *, + metadata: m.MelodyModel, + generator: str | None = None, +) -> str: ... +def dump( + instructions: cabc.Sequence[cabc.Mapping[str, t.Any]], + *, + metadata: m.MelodyModel | Metadata | None = None, + generator: str | None = None, +) -> str: + """Dump an instruction stream to YAML. + + Optionally dump metadata with the instruction stream to YAML. + """ + if isinstance(metadata, m.MelodyModel): + res_info = metadata.info.resources["\x00"] + metadata = { + "model": { + "url": res_info.url, + "revision": res_info.rev_hash, + "entrypoint": str(metadata.info.entrypoint), + }, + "written_by": { + "capellambse": capellambse.__version__.split("+", 1)[0] + }, + } + if generator is not None: + metadata["written_by"]["generator"] = generator + + if not metadata: + return yaml.dump(instructions, Dumper=YDMDumper) + return yaml.dump_all([metadata, instructions], Dumper=YDMDumper) def load(file: FileOrPath) -> list[dict[str, t.Any]]: @@ -58,8 +124,36 @@ def load(file: FileOrPath) -> list[dict[str, t.Any]]: Parameters ---------- file - An open file-like object, or a path or PathLike pointing to such - a file. Files are expected to use UTF-8 encoding. + An open file-like object containing decl instructions, or a path + or PathLike pointing to such a file. Files are expected to use + UTF-8 encoding. + """ + _, instructions = load_with_metadata(file) + return instructions + + +def load_with_metadata( + file: FileOrPath, +) -> tuple[Metadata, list[dict[str, t.Any]]]: + """Load an instruction stream and its metadata from a YAML file. + + If the file does not have a metadata section, an empty dict will be + returned. + + Parameters + ---------- + file + An open file-like object containing decl instructions, or a path + or PathLike pointing to such a file. Files are expected to use + UTF-8 encoding. + + Returns + ------- + dict[str, Any] + The metadata read from the file, or an empty dictionary if the + file did not contain any metadata. + list[dict[str, Any]] + The instruction stream. """ if hasattr(file, "read"): file = t.cast(t.IO[str], file) @@ -69,11 +163,24 @@ def load(file: FileOrPath) -> list[dict[str, t.Any]]: ctx = open(file, encoding="utf-8") # noqa: SIM115 with ctx as opened_file: - return yaml.load(opened_file, Loader=YDMLoader) + contents = list(yaml.load_all(opened_file, Loader=YDMLoader)) + + if len(contents) == 2: + return (t.cast(Metadata, contents[0]) or {}, contents[1] or []) + if len(contents) == 1: + return ({}, contents[0] or []) + if len(contents) == 0: + return ({}, []) + raise ValueError( + f"Expected a YAML file with 1 or 2 documents, found {len(contents)}" + ) def apply( - model: capellambse.MelodyModel, file: FileOrPath + model: capellambse.MelodyModel, + file: FileOrPath, + *, + strict: bool = False, ) -> dict[Promise, capellambse.ModelObject]: """Apply a declarative modelling file to the given model. @@ -88,6 +195,9 @@ def apply( The full format of these files is documented in the :ref:`section about declarative modelling `. + strict + Verify metadata contained in the file against the used model, + and raise an error if they don't match. Notes ----- @@ -103,10 +213,18 @@ def apply( ``!promise``, but reorderings are still possible even if no promises are used in an input document. """ - instructions = collections.deque(load(file)) + metadata, raw_instructions = load_with_metadata(file) + instructions = collections.deque(raw_instructions) promises = dict[Promise, capellambse.ModelObject]() deferred = collections.defaultdict[Promise, list[_FutureAction]](list) + try: + _verify_metadata(model, metadata) + except ValueError as err: + if strict: + raise + logger.warning("Metadata does not match provided model: %s", err) + while instructions: instruction = instructions.popleft() @@ -148,6 +266,80 @@ def apply( return promises +def _verify_metadata( + model: capellambse.MelodyModel, metadata: Metadata +) -> None: + if not metadata: + raise ValueError("Cannot verify decl metadata: No metadata found") + + written_by = metadata.get("written_by", {}).get("capellambse", "") + if not written_by: + raise ValueError( + "Unsupported YAML: Can't find 'written_by:capellambse' in metadata" + ) + if not _is_pep440(written_by): + raise ValueError(f"Malformed version number in metadata: {written_by}") + + current = av.AwesomeVersion( + imm.version("capellambse").partition("+")[0], + ensure_strategy=av.AwesomeVersionStrategy.PEP440, + ) + try: + written_version = av.AwesomeVersion( + written_by, + ensure_strategy=av.AwesomeVersionStrategy.PEP440, + ) + version_matches = current >= written_version + except Exception as err: + raise ValueError( + "Cannot apply decl: Cannot verify required capellambse version:" + f" {type(err).__name__}: {err}" + ) from None + + if not version_matches: + raise ValueError( + "Cannot apply decl: This capellambse is too old for this YAML:" + f" Need at least v{written_by}, but have only v{current})" + ) + + model_metadata = metadata.get("model", {}) + res_info = model.info.resources["\x00"] + url = model_metadata.get("url") + if url != res_info.url: + raise ValueError( + "Cannot apply decl: Model URL mismatch:" + f" YAML expects {url}, current is {res_info.url}" + ) + + hash = model_metadata.get("revision") + if hash != res_info.rev_hash: + raise ValueError( + "Cannot apply decl: Model version mismatch:" + f" YAML expects {hash}, current is {res_info.rev_hash}" + ) + + entrypoint = pathlib.PurePosixPath(model_metadata.get("entrypoint", "")) + if entrypoint != model.info.entrypoint: + raise ValueError( + "Cannot apply decl: Model entrypoint mismatch:" + f" YAML expects {entrypoint}, current is {model.info.entrypoint}" + ) + + +def _is_pep440(version: str) -> bool: + """Check if given version aligns with PEP440. + + See Also + -------- + https://peps.python.org/pep-0440/#appendix-b-parsing-version-strings-with-regular-expressions + """ + pep440_ptrn = re.compile( + r"([1-9][0-9]*!)?(0|[1-9][0-9]*)(\.(0|[1-9][0-9]*))*((a|b|rc)" + r"(0|[1-9][0-9]*))?(\.post(0|[1-9][0-9]*))?(\.dev(0|[1-9][0-9]*))?" + ) + return pep440_ptrn.fullmatch(version) is not None + + def _operate_create( promises: dict[Promise, capellambse.ModelObject], parent: capellambse.ModelObject, @@ -614,10 +806,15 @@ def _main() -> None: @click.command() @click.option("-m", "--model", type=capellambse.ModelCLI(), required=True) + @click.option("-s", "--strict/--relaxed", is_flag=True, default=False) @click.argument("file", type=click.File("r")) - def _main(model: capellambse.MelodyModel, file: t.IO[str]) -> None: + def _main( + model: capellambse.MelodyModel, + file: t.IO[str], + strict: bool, + ) -> None: """Apply a declarative modelling YAML file to a model.""" - apply(model, file) + apply(model, file, strict=strict) model.save() diff --git a/docs/source/start/declarative.rst b/docs/source/start/declarative.rst index 7cb1b7205..1e8eb3737 100644 --- a/docs/source/start/declarative.rst +++ b/docs/source/start/declarative.rst @@ -68,9 +68,42 @@ containing YAML, wrap it in :external:class:`io.StringIO`: Format description ================== -The expected YAML follows a simple format, where a parent object (i.e. an -object that already exists in the model) is selected, and one or more of three -different operations is applied to it: +The YAML file may contain one or two YAML documents (separated by a line +containing only three minus signs ``---``). The first document contains +metadata, while the second document contains the instructions to perform +against the model. The metadata document may be omitted, in which case the file +only contains an instruction stream. + +Metadata +-------- + +.. versionadded:: 0.6.8 + Added metadata section to the declarative modelling YAML. + +The metadata section is optional and has the following format: + +.. code-block:: yaml + + model: + url: https://example.com/model.git + revision: 0123456789abcdefdeadbeef0123456789abcdef + entrypoint: path/to/model.aird + written_by: + capellambse_version: 1.0.0 + generator: Example Generator 1.0.0 + +It contains information about which model the declarative modelling YAML file +wants to change, and which capellambse version and generator it was written +with. A versioned model can be uniquely identified by its repository URL, the +revision, and the model entrypoint. ``decl.apply()`` with ``strict=True`` will +verify these values against the ``model.info`` of the passed model. + +Instructions +------------ + +The expected instruction document in the YAML follows a simple format, where a +parent object (i.e. an object that already exists in the model) is selected, +and one or more of three different operations is applied to it: - ``extend``-ing the object on list attributes, - ``set``-ting properties on the object itself, diff --git a/pyproject.toml b/pyproject.toml index b247ca87a..303d9e95e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ classifiers = [ "Typing :: Typed", ] dependencies = [ + "awesomeversion>=24.2.0", "diskcache>=5.0", "lxml>=4.5.0", "markupsafe>=2.0", diff --git a/tests/test_decl.py b/tests/test_decl.py index 976ddaca1..652227f0c 100644 --- a/tests/test_decl.py +++ b/tests/test_decl.py @@ -8,6 +8,7 @@ import shutil import subprocess import sys +import textwrap import pytest @@ -15,6 +16,7 @@ import capellambse.model as m from capellambse import decl, helpers from capellambse.extensions import reqif +from capellambse.filehandler import git from .conftest import ( # type: ignore[import-untyped] INSTALLED_PACKAGE, @@ -71,6 +73,37 @@ def test_uuid_tags_are_deserialized_as_uuidreference() -> None: assert actual == expected + @staticmethod + def test_loading_with_metadata(): + yml = textwrap.dedent( + """\ + written_by: + capellambse_version: 1.0.0 + generator: Example 1.0.0 + model: + version: "12345678" + url: https://example.invalid + entrypoint: path/to/model.aird + --- + [] + """ + ) + expected = { + "written_by": { + "capellambse_version": "1.0.0", + "generator": "Example 1.0.0", + }, + "model": { + "url": "https://example.invalid", + "version": "12345678", + "entrypoint": "path/to/model.aird", + }, + } + + actual, _ = decl.load_with_metadata(io.StringIO(yml)) + + assert actual == expected + class TestApplyExtend: @staticmethod @@ -832,6 +865,203 @@ def test_sync_operation_can_resolve_multiple_promises_with_one_object( } +class TestStrictMetadata: + @staticmethod + def test_strict_apply_requires_nonempty_metadata(model: m.MelodyModel): + yml = textwrap.dedent( + """\ + {} + --- + [] + """ + ) + + with pytest.raises(ValueError, match="No metadata found$"): + decl.apply(model, io.StringIO(yml), strict=True) + + @staticmethod + def test_capellambse_version_outdated(model: m.MelodyModel, monkeypatch): + modelinfo = m.ModelInfo( + url="git+https://decl-yaml.invalid/ignored-anyway.git", + title="Testmodel", + entrypoint=pathlib.PurePosixPath("testmodel.aird"), + resources={ + "\x00": git.GitHandlerInfo( + url="git+https://decl-yaml.invalid/testmodel.git", + branch="master", + revision="Baseline-1.0", + rev_hash="0000000000000000000000000000000000000000", + ) + }, + capella_version="5.0.0", + viewpoints={"org.polarsys.capella.core.viewpoint": "5.0.0"}, + diagram_cache=None, + ) + monkeypatch.setattr(m.MelodyModel, "info", modelinfo) + + yml = textwrap.dedent( + """\ + written_by: + capellambse: '99999.99.99' + model: + url: 'git+https://decl-yaml.invalid/testmodel.git' + revision: '0000000000000000000000000000000000000000' + entrypoint: 'testmodel.aird' + --- + [] + """ + ) + + with pytest.raises(ValueError, match=r"99999\.99\.99"): + decl.apply(model, io.StringIO(yml), strict=True) + + @staticmethod + def test_declared_capellambse_version_malformed( + model: m.MelodyModel, monkeypatch + ): + modelinfo = m.ModelInfo( + url="git+https://decl-yaml.invalid/ignored-anyway.git", + title="Testmodel", + entrypoint=pathlib.PurePosixPath("testmodel.aird"), + resources={ + "\x00": git.GitHandlerInfo( + url="git+https://decl-yaml.invalid/testmodel.git", + branch="master", + revision="Baseline-1.0", + rev_hash="0000000000000000000000000000000000000000", + ) + }, + capella_version="5.0.0", + viewpoints={"org.polarsys.capella.core.viewpoint": "5.0.0"}, + diagram_cache=None, + ) + monkeypatch.setattr(m.MelodyModel, "info", modelinfo) + + yml = textwrap.dedent( + """\ + written_by: + capellambse: 'this-is-not-a-pep440-version' + model: + url: 'git+https://decl-yaml.invalid/testmodel.git' + revision: '0000000000000000000000000000000000000000' + entrypoint: 'testmodel.aird' + --- + [] + """ + ) + + with pytest.raises(ValueError, match="this-is-not-a-pep440-version"): + decl.apply(model, io.StringIO(yml), strict=True) + + @staticmethod + def test_url_mismatch(model: m.MelodyModel, monkeypatch): + modelinfo = m.ModelInfo( + url="git+https://decl-yaml.invalid/ignored-anyway.git", + title="Testmodel", + entrypoint=pathlib.PurePosixPath("testmodel.aird"), + resources={ + "\x00": git.GitHandlerInfo( + url="git+https://decl-yaml.invalid/testmodel.git", + branch="master", + revision="Baseline-1.0", + rev_hash="0000000000000000000000000000000000000000", + ) + }, + capella_version="5.0.0", + viewpoints={"org.polarsys.capella.core.viewpoint": "5.0.0"}, + diagram_cache=None, + ) + monkeypatch.setattr(m.MelodyModel, "info", modelinfo) + + yml = textwrap.dedent( + """\ + written_by: + capellambse: '0.0.1' + model: + url: 'git+https://decl-yaml.invalid/other.git' + revision: '0000000000000000000000000000000000000000' + entrypoint: 'testmodel.aird' + --- + [] + """ + ) + + with pytest.raises(ValueError, match=r"/other\.git"): + decl.apply(model, io.StringIO(yml), strict=True) + + @staticmethod + def test_entrypoint_mismatch(model: m.MelodyModel, monkeypatch): + modelinfo = m.ModelInfo( + url="git+https://decl-yaml.invalid/ignored-anyway.git", + title="Testmodel", + entrypoint=pathlib.PurePosixPath("testmodel.aird"), + resources={ + "\x00": git.GitHandlerInfo( + url="git+https://decl-yaml.invalid/testmodel.git", + branch="master", + revision="Baseline-1.0", + rev_hash="0000000000000000000000000000000000000000", + ) + }, + capella_version="5.0.0", + viewpoints={"org.polarsys.capella.core.viewpoint": "5.0.0"}, + diagram_cache=None, + ) + monkeypatch.setattr(m.MelodyModel, "info", modelinfo) + + yml = textwrap.dedent( + """\ + written_by: + capellambse: '0.0.1' + model: + url: git+https://decl-yaml.invalid/testmodel.git + revision: '0000000000000000000000000000000000000000' + entrypoint: othermodel.aird + --- + [] + """ + ) + + with pytest.raises(ValueError, match=r"othermodel\.aird"): + decl.apply(model, io.StringIO(yml), strict=True) + + @staticmethod + def test_model_revision_doesnt_match(model: m.MelodyModel, monkeypatch): + modelinfo = m.ModelInfo( + url="git+https://decl-yaml.invalid/ignored-anyway.git", + title="Testmodel", + entrypoint=pathlib.PurePosixPath("testmodel.aird"), + resources={ + "\x00": git.GitHandlerInfo( + url="git+https://decl-yaml.invalid/testmodel.git", + branch="master", + revision="Baseline-1.0", + rev_hash="0000000000000000000000000000000000000000", + ) + }, + capella_version="5.0.0", + viewpoints={"org.polarsys.capella.core.viewpoint": "5.0.0"}, + diagram_cache=None, + ) + monkeypatch.setattr(m.MelodyModel, "info", modelinfo) + + yml = textwrap.dedent( + """\ + written_by: + capellambse: '0.0.1' + model: + url: git+https://decl-yaml.invalid/testmodel.git + revision: 'ffffffffffffffffffffffffffffffffffffffff' + entrypoint: testmodel.aird + --- + [] + """ + ) + + with pytest.raises(ValueError, match=40 * "f"): + decl.apply(model, io.StringIO(yml), strict=True) + + @pytest.mark.parametrize("filename", ["coffee-machine.yml"]) def test_full_example(model: m.MelodyModel, filename: str): decl.apply(model, DATAPATH / filename)