Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/wip-diff-objects' into basic-report
Browse files Browse the repository at this point in the history
  • Loading branch information
vik378 committed Oct 6, 2023
2 parents 0b4430e + 3efcbfc commit 143f69b
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 66 deletions.
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ repos:
- id: check-toml
- id: check-vcs-permalinks
- id: check-xml
- id: check-yaml
- id: debug-statements
- id: destroyed-symlinks
- id: end-of-file-fixer
Expand Down Expand Up @@ -50,6 +49,8 @@ repos:
rev: v1.5.1
hooks:
- id: mypy
additional_dependencies:
- types-pyyaml
- repo: https://github.com/Lucas-C/pre-commit-hooks
rev: v1.4.2
hooks:
Expand Down
136 changes: 92 additions & 44 deletions model_diffsummary/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@
"""Main entry point into model_diffsummary."""
from __future__ import annotations

import datetime
import logging
import pathlib
import typing as t

import capellambse
import click
import markupsafe
import yaml
from capellambse import model as m
from capellambse.filehandler import git as gitfh
from capellambse.model import common as c

import model_diffsummary

from . import types

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -45,24 +48,52 @@ def main(
old_model = capellambse.MelodyModel(**model, revision=old_version)
new_model = capellambse.MelodyModel(**model, revision=new_version)

metadata = {
metadata: types.Metadata = {
"model": model,
"old-revision": old_version,
"new-revision": new_version,
"old_revision": _get_revision_info(old_model, old_version),
"new_revision": _get_revision_info(new_model, new_version),
}
diagrams = _compare_all_diagrams(old_model, new_model)

objects = _compare_all_objects(old_model, new_model)
diagrams = _compare_all_diagrams(old_model, new_model)

result = {"metadata": metadata, "diagrams": diagrams, "objects": objects}
result: types.ChangeSummaryDocument = {
"metadata": metadata,
"diagrams": diagrams,
"objects": objects,
}

yaml.dump(result, output, Dumper=CustomYAMLDumper)


def _get_revision_info(
model: capellambse.MelodyModel,
revision: str,
) -> types.RevisionInfo:
"""Return the revision info of the given model."""
info = model.info
fh = model._loader.resources["\x00"]
assert isinstance(fh, gitfh.GitFileHandler)
author, date_str, description = fh._git(
"log",
"-1",
"--format=%aN%x00%aI%x00%B",
info.rev_hash,
encoding="utf-8",
).split("\x00")
return {
"hash": info.rev_hash,
"revision": revision,
"author": author,
"date": datetime.datetime.fromisoformat(date_str),
"description": description.rstrip(),
}


def _compare_all_diagrams(
old: capellambse.MelodyModel, new: capellambse.MelodyModel
) -> t.Any:
result: t.Any = {"oa": {}, "la": {}, "pa": {}, "sa": {}}
old: capellambse.MelodyModel,
new: capellambse.MelodyModel,
) -> types.DiagramChanges:
result: dict[str, types.DiagramLayer] = {"oa": {}, "sa": {}}
for layer, diagtype in (
("oa", m.modeltypes.DiagramType.COC),
("oa", m.modeltypes.DiagramType.OAIB),
Expand All @@ -75,51 +106,58 @@ def _compare_all_diagrams(
):
type_result = _compare_diagram_type(old, new, layer, diagtype)
result[layer][diagtype.name] = type_result
return result
return t.cast(types.DiagramChanges, result)


def _compare_diagram_type(
old: capellambse.MelodyModel,
new: capellambse.MelodyModel,
layer: str,
diag_type: m.modeltypes.DiagramType,
) -> t.Any:
logging.debug("Collecting diagrams of type %s", diag_type.name)
changes = {}
) -> types.DiagramChange:
logger.debug("Collecting diagrams of type %s", diag_type.name)
changes: types.DiagramChange = {}

old_diags = getattr(old, layer).diagrams.by_type(diag_type)
new_diags = getattr(new, layer).diagrams.by_type(diag_type)

old_uuids = {i.uuid for i in old_diags}
new_uuids = {i.uuid for i in new_diags}

if created_uuids := new_uuids - old_uuids:
if created_uuids := sorted(new_uuids - old_uuids):
changes["created"] = [
_diag2dict(new_diags.by_uuid(i)) for i in created_uuids
]
if deleted_uuids := old_uuids - new_uuids:
if deleted_uuids := sorted(old_uuids - new_uuids):
changes["deleted"] = [
_diag2dict(old_diags.by_uuid(i)) for i in deleted_uuids
]

for i in old_uuids & new_uuids:
logger.debug(
"Comparing diagram %s with (new) name %s",
i,
new_diags.by_uuid(i).name,
)
if diff := _diag2diff(old_diags.by_uuid(i), new_diags.by_uuid(i)):
changes.setdefault("modified", []).append(diff)
return changes


def _diag2dict(obj: m.diagram.Diagram | c.GenericElement) -> dict[str, t.Any]:
def _diag2dict(
obj: m.diagram.Diagram | c.GenericElement,
) -> types.FullDiagram:
"""Serialize a diagram element into a dict.
This function is used for diagrams that were either created or
deleted, in which case only the names are serialized.
"""
return {"uuid": obj.uuid, "name": _get_name(obj)}
return {"uuid": obj.uuid, "display_name": _get_name(obj)}


def _diag2diff(
old: m.diagram.Diagram, new: m.diagram.Diagram
) -> dict[str, t.Any] | None:
) -> types.ChangedDiagram | None:
"""Serialize the differences between the old and new diagram.
This function is used for diagrams that were modified. Newly
Expand All @@ -131,18 +169,23 @@ def _diag2diff(
The *layout_changes* flag indicates that the diagram has changed
positions, sizes or bendpoints for exchanges.
"""
changes: t.Any = {}
changes: t.Any = {
"uuid": new.uuid,
"display_name": _get_name(new),
}

old_uuids = set(old.nodes.by_uuid)
new_uuids = set(new.nodes.by_uuid)
old_nodes = old.nodes
new_nodes = new.nodes
old_uuids = set(old_nodes.by_uuid)
new_uuids = set(new_nodes.by_uuid)

if created_uuids := new_uuids - old_uuids:
if introduced_uuids := sorted(new_uuids - old_uuids):
changes["introduced"] = [
_diag2dict(new.nodes.by_uuid(i)) for i in created_uuids
_diag2dict(new_nodes.by_uuid(i)) for i in introduced_uuids
]
if deleted_uuids := old_uuids - new_uuids:
if removed_uuids := sorted(old_uuids - new_uuids):
changes["removed"] = [
_diag2dict(old.nodes.by_uuid(i)) for i in deleted_uuids
_diag2dict(old_nodes.by_uuid(i)) for i in removed_uuids
]

# TODO: Check for layout changes by comparing the aird XML
Expand All @@ -158,9 +201,10 @@ def _diagelt2diff(


def _compare_all_objects(
old: capellambse.MelodyModel, new: capellambse.MelodyModel
) -> t.Any:
result: t.Any = {"oa": {}, "la": {}, "pa": {}, "sa": {}}
old: capellambse.MelodyModel,
new: capellambse.MelodyModel,
) -> types.ObjectChanges:
result: dict[str, types.ObjectLayer] = {"oa": {}, "sa": {}}
for layer, objtype in (
("oa", m.oa.OperationalActivity),
("oa", m.fa.FunctionalExchange),
Expand All @@ -171,17 +215,17 @@ def _compare_all_objects(
):
type_result = _compare_object_type(old, new, layer, objtype)
result[layer][objtype.__name__] = type_result
return result
return t.cast(types.ObjectChanges, result)


def _compare_object_type(
old: capellambse.MelodyModel,
new: capellambse.MelodyModel,
layer: str,
obj_type: type[c.GenericElement],
) -> t.Any:
) -> types.ObjectChange:
logging.debug("Collecting objects of type %s", obj_type.__name__)
changes = {}
changes: types.ObjectChange = {}

old_objs = old.search(obj_type, below=getattr(old._model, layer))
new_objs = new.search(obj_type, below=getattr(new._model, layer))
Expand All @@ -204,24 +248,28 @@ def _compare_object_type(
return changes


def _obj2dict(obj: c.GenericElement) -> dict[str, t.Any]:
def _obj2dict(obj: c.GenericElement) -> types.FullObject:
"""Serialize a model object into a dict.
This function is used for objects that were either created or
deleted, in which case all available attributes are serialized.
"""
result = {}
attributes: dict[str, t.Any] = {}
for attr in dir(type(obj)):
acc = getattr(type(obj), attr, None)
if isinstance(acc, c.AttributeProperty):
if (val := getattr(obj, attr)) is not None:
result[attr] = val
return result
attributes[attr] = val
return {
"uuid": obj.uuid,
"display_name": _get_name(obj),
"attributes": attributes,
}


def _obj2diff(
old: c.GenericElement, new: c.GenericElement
) -> dict[str, t.Any] | None:
) -> types.ChangedObject | None:
"""Serialize the differences between the old and new object.
This function is used for objects that were modified. Only the
Expand All @@ -230,7 +278,7 @@ def _obj2diff(
The new (current) *name* is always serialized. If it didn't change,
it will not have the "previous" key.
"""
changes: t.Any = {}
attributes: dict[str, types.ChangedAttribute] = {}
for attr in dir(type(old)):
if not isinstance(
getattr(type(old), attr, None),
Expand All @@ -244,34 +292,34 @@ def _obj2diff(
new_val, c.GenericElement
):
if old_val.uuid != new_val.uuid:
changes[attr] = {
attributes[attr] = {
"previous": _serialize_obj(old_val),
"current": _serialize_obj(new_val),
}
elif isinstance(old_val, c.ElementList) and isinstance(
new_val, c.ElementList
):
if [i.uuid for i in old_val] != [i.uuid for i in new_val]:
changes[attr] = {
attributes[attr] = {
"previous": _serialize_obj(old_val),
"current": _serialize_obj(new_val),
}
elif old_val != new_val:
changes[attr] = {
attributes[attr] = {
"previous": _serialize_obj(old_val),
"current": _serialize_obj(new_val),
}

if not changes:
if not attributes:
return None
return {
"uuid": old.uuid,
"display_name": _get_name(new),
"changes": changes,
"attributes": attributes,
}


def _serialize_obj(obj: t.Any) -> dict[str, t.Any] | list[dict[str, t.Any]]:
def _serialize_obj(obj: t.Any) -> types.BaseObject | list[types.BaseObject]:
if isinstance(obj, c.GenericElement):
return {"uuid": obj.uuid, "display_name": _get_name(obj)}
elif isinstance(obj, c.ElementList):
Expand Down
Loading

0 comments on commit 143f69b

Please sign in to comment.