Skip to content

Commit

Permalink
feat: Add implementation
Browse files Browse the repository at this point in the history
Co-authored-by: Ernst Würger <[email protected]>
Co-authored-by: Huyen Nguyen <[email protected]>
  • Loading branch information
3 people committed Oct 4, 2023
1 parent 0315c65 commit f8d50a5
Show file tree
Hide file tree
Showing 7 changed files with 638 additions and 3 deletions.
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ repos:
- id: check-toml
- id: check-vcs-permalinks
- id: check-xml
- id: check-yaml
- id: debug-statements
- id: destroyed-symlinks
- id: end-of-file-fixer
Expand Down Expand Up @@ -50,6 +49,8 @@ repos:
rev: v1.5.1
hooks:
- id: mypy
additional_dependencies:
- types-pyyaml
- repo: https://github.com/Lucas-C/pre-commit-hooks
rev: v1.4.2
hooks:
Expand Down
324 changes: 322 additions & 2 deletions model_diffsummary/__main__.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,340 @@
# Copyright DB Netz AG and contributors
# SPDX-License-Identifier: Apache-2.0
"""Main entry point into model_diffsummary."""
from __future__ import annotations

import logging
import typing as t

import capellambse
import click
import markupsafe
import yaml
from capellambse import model as m
from capellambse.model import common as c

import model_diffsummary

from . import types

logger = logging.getLogger(__name__)


@click.command()
@click.version_option(
version=model_diffsummary.__version__,
prog_name="Model-Diff Summary Tool",
message="%(prog)s %(version)s",
)
def main():
"""Console script for model_diffsummary."""
@click.argument("model", type=capellambse.cli_helpers.ModelInfoCLI())
@click.argument("old_version")
@click.argument("new_version")
@click.option("-o", "--output", type=click.File("w"), default="-")
def main(
model: dict[str, t.Any],
old_version: str,
new_version: str,
output: t.IO[str],
) -> None:
"""Generate the diff summary between two model versions.
The comparison result will be printed to stdout in YAML format.
"""
logging.basicConfig(level="DEBUG")
if "revision" in model:
del model["revision"]
old_model = capellambse.MelodyModel(**model, revision=old_version)
new_model = capellambse.MelodyModel(**model, revision=new_version)

metadata: types.Metadata = {
"model": model,
"old_revision": _get_revision_info(old_model, old_version),
"new_revision": _get_revision_info(new_model, new_version),
}
objects = _compare_all_objects(old_model, new_model)
diagrams = _compare_all_diagrams(old_model, new_model)

result: types.ChangeSummaryDocument = {
"metadata": metadata,
"diagrams": diagrams,
"objects": objects,
}

yaml.dump(result, output, Dumper=CustomYAMLDumper)


def _get_revision_info(
model: capellambse.MelodyModel,
revision: str,
) -> types.RevisionInfo:
"""Return the revision info of the given model."""
info = model.info
return {
"hash": info.rev_hash,
"revision": revision,
# TODO add author, date, description from commit
}


def _compare_all_diagrams(
old: capellambse.MelodyModel,
new: capellambse.MelodyModel,
) -> types.DiagramChanges:
result: dict[str, types.DiagramLayer] = {"oa": {}, "sa": {}}
for layer, diagtype in (
("oa", m.modeltypes.DiagramType.COC),
("oa", m.modeltypes.DiagramType.OAIB),
("oa", m.modeltypes.DiagramType.OPD),
("oa", m.modeltypes.DiagramType.OAB),
("sa", m.modeltypes.DiagramType.CC),
("sa", m.modeltypes.DiagramType.SDFB),
("sa", m.modeltypes.DiagramType.MCB),
("sa", m.modeltypes.DiagramType.SAB),
):
type_result = _compare_diagram_type(old, new, layer, diagtype)
result[layer][diagtype.name] = type_result
return t.cast(types.DiagramChanges, result)


def _compare_diagram_type(
old: capellambse.MelodyModel,
new: capellambse.MelodyModel,
layer: str,
diag_type: m.modeltypes.DiagramType,
) -> types.DiagramChange:
logger.debug("Collecting diagrams of type %s", diag_type.name)
changes: types.DiagramChange = {}

old_diags = getattr(old, layer).diagrams.by_type(diag_type)
new_diags = getattr(new, layer).diagrams.by_type(diag_type)

old_uuids = {i.uuid for i in old_diags}
new_uuids = {i.uuid for i in new_diags}

if created_uuids := sorted(new_uuids - old_uuids):
changes["created"] = [
_diag2dict(new_diags.by_uuid(i)) for i in created_uuids
]
if deleted_uuids := sorted(old_uuids - new_uuids):
changes["deleted"] = [
_diag2dict(old_diags.by_uuid(i)) for i in deleted_uuids
]

for i in old_uuids & new_uuids:
logger.debug(
"Comparing diagram %s with (new) name %s",
i,
new_diags.by_uuid(i).name,
)
if diff := _diag2diff(old_diags.by_uuid(i), new_diags.by_uuid(i)):
changes.setdefault("modified", []).append(diff)
return changes


def _diag2dict(
obj: m.diagram.Diagram | c.GenericElement,
) -> types.FullDiagram:
"""Serialize a diagram element into a dict.
This function is used for diagrams that were either created or
deleted, in which case only the names are serialized.
"""
return {"uuid": obj.uuid, "display_name": _get_name(obj)}


def _diag2diff(
old: m.diagram.Diagram, new: m.diagram.Diagram
) -> types.ChangedDiagram | None:
"""Serialize the differences between the old and new diagram.
This function is used for diagrams that were modified. Newly
introduced elements and removed elements are serialized.
The new (current) *display-name* is always serialized. If it didn't
change, it will not have the "previous" key.
The *layout_changes* flag indicates that the diagram has changed
positions, sizes or bendpoints for exchanges.
"""
changes: t.Any = {
"uuid": new.uuid,
"display_name": _get_name(new),
}

old_nodes = old.nodes
new_nodes = new.nodes
old_uuids = set(old_nodes.by_uuid)
new_uuids = set(new_nodes.by_uuid)

if introduced_uuids := sorted(new_uuids - old_uuids):
changes["introduced"] = [
_diag2dict(new_nodes.by_uuid(i)) for i in introduced_uuids
]
if removed_uuids := sorted(old_uuids - new_uuids):
changes["removed"] = [
_diag2dict(old_nodes.by_uuid(i)) for i in removed_uuids
]

# TODO: Check for layout changes by comparing the aird XML
return changes


def _diagelt2diff(
old: capellambse.diagram.DiagramElement,
new: capellambse.diagram.DiagramElement,
) -> dict[str, t.Any]:
# TODO
return {}


def _compare_all_objects(
old: capellambse.MelodyModel,
new: capellambse.MelodyModel,
) -> types.ObjectChanges:
result: dict[str, types.ObjectLayer] = {"oa": {}, "sa": {}}
for layer, objtype in (
("oa", m.oa.OperationalActivity),
("oa", m.fa.FunctionalExchange),
("oa", m.oa.OperationalCapability),
("sa", m.ctx.SystemComponent),
("sa", m.ctx.SystemFunction),
("sa", m.ctx.Capability),
):
type_result = _compare_object_type(old, new, layer, objtype)
result[layer][objtype.__name__] = type_result
return t.cast(types.ObjectChanges, result)


def _compare_object_type(
old: capellambse.MelodyModel,
new: capellambse.MelodyModel,
layer: str,
obj_type: type[c.GenericElement],
) -> types.ObjectChange:
logging.debug("Collecting objects of type %s", obj_type.__name__)
changes: types.ObjectChange = {}

old_objs = old.search(obj_type, below=getattr(old._model, layer))
new_objs = new.search(obj_type, below=getattr(new._model, layer))

old_uuids = {i.uuid for i in old_objs}
new_uuids = {i.uuid for i in new_objs}

if created_uuids := new_uuids - old_uuids:
changes["created"] = [
_obj2dict(new.by_uuid(i)) for i in sorted(created_uuids)
]
if deleted_uuids := old_uuids - new_uuids:
changes["deleted"] = [
_obj2dict(old.by_uuid(i)) for i in sorted(deleted_uuids)
]

for i in sorted(old_uuids & new_uuids):
if diff := _obj2diff(old.by_uuid(i), new.by_uuid(i)):
changes.setdefault("modified", []).append(diff)
return changes


def _obj2dict(obj: c.GenericElement) -> types.FullObject:
"""Serialize a model object into a dict.
This function is used for objects that were either created or
deleted, in which case all available attributes are serialized.
"""
attributes: dict[str, t.Any] = {}
for attr in dir(type(obj)):
acc = getattr(type(obj), attr, None)
if isinstance(acc, c.AttributeProperty):
if (val := getattr(obj, attr)) is not None:
attributes[attr] = val
return {
"uuid": obj.uuid,
"display_name": _get_name(obj),
"attributes": attributes,
}


def _obj2diff(
old: c.GenericElement, new: c.GenericElement
) -> types.ChangedObject | None:
"""Serialize the differences between the old and new object.
This function is used for objects that were modified. Only the
attributes that were changed are serialized.
The new (current) *name* is always serialized. If it didn't change,
it will not have the "previous" key.
"""
attributes: dict[str, types.ChangedAttribute] = {}
for attr in dir(type(old)):
if not isinstance(
getattr(type(old), attr, None),
(c.AttributeProperty, c.AttrProxyAccessor, c.LinkAccessor),
):
continue

old_val = getattr(old, attr)
new_val = getattr(new, attr)
if isinstance(old_val, c.GenericElement) and isinstance(
new_val, c.GenericElement
):
if old_val.uuid != new_val.uuid:
attributes[attr] = {
"previous": _serialize_obj(old_val),
"current": _serialize_obj(new_val),
}
elif isinstance(old_val, c.ElementList) and isinstance(
new_val, c.ElementList
):
if [i.uuid for i in old_val] != [i.uuid for i in new_val]:
attributes[attr] = {
"previous": _serialize_obj(old_val),
"current": _serialize_obj(new_val),
}
elif old_val != new_val:
attributes[attr] = {
"previous": _serialize_obj(old_val),
"current": _serialize_obj(new_val),
}

if not attributes:
return None
return {
"uuid": old.uuid,
"display_name": _get_name(new),
"attributes": attributes,
}


def _serialize_obj(obj: t.Any) -> types.BaseObject | list[types.BaseObject]:
if isinstance(obj, c.GenericElement):
return {"uuid": obj.uuid, "display_name": _get_name(obj)}
elif isinstance(obj, c.ElementList):
return [{"uuid": i.uuid, "display_name": _get_name(i)} for i in obj]
return obj


def _get_name(obj: c.GenericElement) -> str:
"""Return the object's name.
If the object doesn't own a name, its type is returned instead.
"""
if name := obj.name:
return name
return f"[{type(obj).__name__}]"


class CustomYAMLDumper(yaml.SafeDumper):
"""A custom YAML dumper that can serialize markupsafe.Markup."""

def represent_markup(self, data):
"""Represent markupsafe.Markup with the '!html' tag."""
return self.represent_scalar("!html", str(data))


CustomYAMLDumper.add_representer(
markupsafe.Markup, CustomYAMLDumper.represent_markup
)


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit f8d50a5

Please sign in to comment.