From 066b1760efeb5f271c181cfafcdc7d506003f7e2 Mon Sep 17 00:00:00 2001 From: Michael Harbarth <michael.harbarth@deutschebahn.com> Date: Thu, 30 Nov 2023 17:40:06 +0100 Subject: [PATCH] refactor: Remove ctx from all functions outside of main, WIP --- capella2polarion/__main__.py | 64 +++- capella2polarion/elements/__init__.py | 177 +++++----- capella2polarion/elements/element.py | 193 ++++++----- capella2polarion/elements/serialize.py | 445 +++++++++++++------------ tests/test_cli.py | 1 - tests/test_elements.py | 323 ++++++++++++------ 6 files changed, 688 insertions(+), 515 deletions(-) diff --git a/capella2polarion/__main__.py b/capella2polarion/__main__.py index 23e0f2e8..d9816002 100644 --- a/capella2polarion/__main__.py +++ b/capella2polarion/__main__.py @@ -219,29 +219,61 @@ def model_elements( ( ctx.obj["ELEMENTS"], ctx.obj["POLARION_TYPE_MAP"], - ) = elements.get_elements_and_type_map(ctx.obj) + ) = elements.get_elements_and_type_map( + ctx.obj["CONFIG"], ctx.obj["MODEL"], ctx.obj["DIAGRAM_IDX"] + ) ctx.obj["CAPELLA_UUIDS"] = set(ctx.obj["POLARION_TYPE_MAP"]) - ctx.obj["TYPES"] = elements.get_types(ctx.obj) - ctx.obj["POLARION_WI_MAP"] = elements.get_polarion_wi_map(ctx.obj) + types = elements.get_types( + ctx.obj["POLARION_TYPE_MAP"], ctx.obj["ELEMENTS"] + ) + ctx.obj["POLARION_WI_MAP"] = elements.get_polarion_wi_map( + types, ctx.obj["API"] + ) ctx.obj["POLARION_ID_MAP"] = { uuid: wi.id for uuid, wi in ctx.obj["POLARION_WI_MAP"].items() } - duuids = { - diag["uuid"] for diag in ctx.obj["DIAGRAM_IDX"] if diag["success"] - } - ctx.obj["ELEMENTS"]["Diagram"] = [ - diag for diag in ctx.obj["ELEMENTS"]["Diagram"] if diag.uuid in duuids - ] + ctx.obj["DESCR_REFERENCES"] = {} - elements.element.create_work_items(ctx.obj) - elements.delete_work_items(ctx.obj) - elements.post_work_items(ctx.obj) + new_work_items = elements.element.create_work_items( + ctx.obj["ELEMENTS"], + ctx.obj["DIAGRAM_CACHE"], + ctx.obj["POLARION_TYPE_MAP"], + ctx.obj["MODEL"], + ctx.obj["POLARION_ID_MAP"], + ctx.obj["DESCR_REFERENCES"], + ) + elements.delete_work_items( + ctx.obj["POLARION_ID_MAP"], + ctx.obj["POLARION_WI_MAP"], + ctx.obj["CAPELLA_UUIDS"], + ctx.obj["API"], + ) + elements.post_work_items( + ctx.obj["POLARION_ID_MAP"], + new_work_items, + ctx.obj["POLARION_WI_MAP"], + ctx.obj["API"], + ) # Create missing links b/c of unresolved references - elements.element.create_work_items(ctx.obj) - elements.patch_work_items(ctx.obj) - - elements.make_model_elements_index(ctx.obj) + new_work_items = elements.element.create_work_items( + ctx.obj["ELEMENTS"], + ctx.obj["DIAGRAM_CACHE"], + ctx.obj["POLARION_TYPE_MAP"], + ctx.obj["MODEL"], + ctx.obj["POLARION_ID_MAP"], + ctx.obj["DESCR_REFERENCES"], + ) + elements.patch_work_items( + ctx.obj["POLARION_ID_MAP"], + ctx.obj["MODEL"], + new_work_items, + ctx.obj["POLARION_WI_MAP"], + ctx.obj["API"], + ctx.obj["DESCR_REFERENCES"], + ctx.obj["PROJECT_ID"], + ctx.obj["ROLES"], + ) if __name__ == "__main__": diff --git a/capella2polarion/elements/__init__.py b/capella2polarion/elements/__init__.py index 35980484..f29bc18c 100644 --- a/capella2polarion/elements/__init__.py +++ b/capella2polarion/elements/__init__.py @@ -8,7 +8,6 @@ "delete_work_items", "get_types", "get_elements_and_type_map", - "make_model_elements_index", "STATUS_DELETE", ] @@ -17,6 +16,7 @@ import typing as t from itertools import chain +import capellambse import polarion_rest_api_client as polarion_api import yaml from capellambse.model import common @@ -47,13 +47,12 @@ def get_polarion_wi_map( - ctx: dict[str, t.Any], type_: str = "" + _types: set, api_client: polarion_api.OpenAPIPolarionProjectClient ) -> dict[str, t.Any]: """Return a map from Capella UUIDs to Polarion work items.""" - types_ = map(helpers.resolve_element_type, ctx.get("TYPES", [])) - work_item_types = [type_] if type_ else list(types_) + work_item_types = list(map(helpers.resolve_element_type, _types)) _type = " ".join(work_item_types) - work_items = ctx["API"].get_all_work_items( + work_items = api_client.get_all_work_items( f"type:({_type})", {"workitems": "id,uuid_capella,checksum,status"} ) return { @@ -61,110 +60,116 @@ def get_polarion_wi_map( } -def delete_work_items(ctx: dict[str, t.Any]) -> None: +def delete_work_items( + polarion_id_map: dict[str, str], + polarion_wi_map: dict[str, serialize.CapellaWorkItem], + capella_uuids: set[str], + api_client: polarion_api.OpenAPIPolarionProjectClient, +) -> None: """Delete work items in a Polarion project. If the delete flag is set to ``False`` in the context work items are marked as ``to be deleted`` via the status attribute. - - Parameters - ---------- - ctx - The context for the workitem operation to be processed. """ def serialize_for_delete(uuid: str) -> str: logger.info( "Delete work item %r...", - workitem_id := ctx["POLARION_ID_MAP"][uuid], + workitem_id := polarion_id_map[uuid], ) return workitem_id existing_work_items = { uuid - for uuid, work_item in ctx["POLARION_WI_MAP"].items() + for uuid, work_item in polarion_wi_map.items() if work_item.status != "deleted" } - uuids: set[str] = existing_work_items - set(ctx["CAPELLA_UUIDS"]) + uuids: set[str] = existing_work_items - capella_uuids work_item_ids = [serialize_for_delete(uuid) for uuid in uuids] if work_item_ids: try: - ctx["API"].delete_work_items(work_item_ids) + api_client.delete_work_items(work_item_ids) for uuid in uuids: - del ctx["POLARION_WI_MAP"][uuid] - del ctx["POLARION_ID_MAP"][uuid] + del polarion_wi_map[uuid] + del polarion_id_map[uuid] except polarion_api.PolarionApiException as error: logger.error("Deleting work items failed. %s", error.args[0]) -def post_work_items(ctx: dict[str, t.Any]) -> None: - """Post work items in a Polarion project. - - Parameters - ---------- - ctx - The context for the workitem operation to be processed. - """ - work_items: list[serialize.CapellaWorkItem] = [] - for work_item in ctx["WORK_ITEMS"].values(): - if work_item.uuid_capella in ctx["POLARION_ID_MAP"]: +def post_work_items( + polarion_id_map: dict[str, str], + new_work_items: dict[str, serialize.CapellaWorkItem], + polarion_wi_map: dict[str, serialize.CapellaWorkItem], + api_client: polarion_api.OpenAPIPolarionProjectClient, +) -> None: + """Post work items in a Polarion project.""" + missing_work_items: list[serialize.CapellaWorkItem] = [] + for work_item in new_work_items.values(): + if work_item.uuid_capella in polarion_id_map: continue assert work_item is not None - work_items.append(work_item) + missing_work_items.append(work_item) logger.info("Create work item for %r...", work_item.title) - if work_items: + if missing_work_items: try: - ctx["API"].create_work_items(work_items) - workitems = {wi.uuid_capella: wi for wi in work_items if wi.id} - ctx["POLARION_WI_MAP"].update(workitems) - ctx["POLARION_ID_MAP"] = { - uuid: wi.id for uuid, wi in ctx["POLARION_WI_MAP"].items() - } + api_client.create_work_items(missing_work_items) + for work_item in missing_work_items: + polarion_id_map[work_item.uuid_capella] = work_item.id + polarion_wi_map[work_item.uuid_capella] = work_item except polarion_api.PolarionApiException as error: logger.error("Creating work items failed. %s", error.args[0]) -def patch_work_items(ctx: dict[str, t.Any]) -> None: - """Update work items in a Polarion project. - - Parameters - ---------- - ctx - The context for the workitem operation to be processed. - """ - ctx["POLARION_ID_MAP"] = uuids = { - uuid: wi.id - for uuid, wi in ctx["POLARION_WI_MAP"].items() - if wi.status == "open" and wi.uuid_capella and wi.id - } +def patch_work_items( + polarion_id_map: dict[str, str], + model: capellambse.MelodyModel, + new_work_items: dict[str, serialize.CapellaWorkItem], + polarion_wi_map: dict[str, serialize.CapellaWorkItem], + api_client: polarion_api.OpenAPIPolarionProjectClient, + descr_references, + project_id, + link_roles, +) -> None: + """Update work items in a Polarion project.""" + polarion_id_map.update( + { + uuid: wi.id + for uuid, wi in polarion_wi_map.items() + if wi.status == "open" and wi.uuid_capella and wi.id + } + ) back_links: dict[str, list[polarion_api.WorkItemLink]] = {} - for uuid in uuids: - objects = ctx["MODEL"] + for uuid in polarion_id_map: + objects = model if uuid.startswith("_"): - objects = ctx["MODEL"].diagrams - + objects = model.diagrams obj = objects.by_uuid(uuid) - work_item: serialize.CapellaWorkItem = ctx["WORK_ITEMS"][uuid] - old_work_item: serialize.CapellaWorkItem = ctx["POLARION_WI_MAP"][uuid] - links = element.create_links(obj, ctx) + links = element.create_links( + obj, + polarion_id_map, + descr_references, + project_id, + model, + link_roles, + ) + work_item: serialize.CapellaWorkItem = new_work_items[uuid] work_item.linked_work_items = links - work_item.id = old_work_item.id element.create_grouped_link_fields(work_item, back_links) - for uuid in uuids: - new_work_item: serialize.CapellaWorkItem = ctx["WORK_ITEMS"][uuid] - old_work_item = ctx["POLARION_WI_MAP"][uuid] + for uuid in polarion_id_map: + new_work_item: serialize.CapellaWorkItem = new_work_items[uuid] + old_work_item = polarion_wi_map[uuid] if old_work_item.id in back_links: element.create_grouped_back_link_fields( new_work_item, back_links[old_work_item.id] ) api_helper.patch_work_item( - ctx["API"], + api_client, new_work_item, old_work_item, old_work_item.title, @@ -172,24 +177,26 @@ def patch_work_items(ctx: dict[str, t.Any]) -> None: ) -def get_types(ctx: dict[str, t.Any]) -> set[str]: +def get_types(polarion_type_map, elements) -> set[str]: """Return a set of Polarion types from the current context.""" xtypes = set[str]() - for obj in chain.from_iterable(ctx["ELEMENTS"].values()): - xtype = ctx["POLARION_TYPE_MAP"].get(obj.uuid, type(obj).__name__) + for obj in chain.from_iterable(elements.values()): + xtype = polarion_type_map.get(obj.uuid, type(obj).__name__) xtypes.add(helpers.resolve_element_type(xtype)) return xtypes def get_elements_and_type_map( - ctx: dict[str, t.Any] + config: dict[str, t.Any], + model: capellambse.MelodyModel, + diagram_idx: list[dict[str, t.Any]], ) -> tuple[dict[str, list[common.GenericElement]], dict[str, str]]: """Return an elements and UUID to Polarion type map.""" convert_type = POL2CAPELLA_TYPES type_map: dict[str, str] = {} elements: dict[str, list[common.GenericElement]] = {} - for _below, pol_types in ctx["CONFIG"].items(): - below = getattr(ctx["MODEL"], _below) + for _below, pol_types in config.items(): + below = getattr(model, _below) for typ in pol_types: if isinstance(typ, dict): typ = list(typ.keys())[0] @@ -198,17 +205,15 @@ def get_elements_and_type_map( continue xtype = convert_type.get(typ, typ) - objects = ctx["MODEL"].search(xtype, below=below) + objects = model.search(xtype, below=below) elements.setdefault(typ, []).extend(objects) for obj in objects: type_map[obj.uuid] = typ _fix_components(elements, type_map) - diagrams_from_cache = { - d["uuid"] for d in ctx["DIAGRAM_IDX"] if d["success"] - } + diagrams_from_cache = {d["uuid"] for d in diagram_idx if d["success"]} elements["Diagram"] = [ - d for d in ctx["MODEL"].diagrams if d.uuid in diagrams_from_cache + d for d in model.diagrams if d.uuid in diagrams_from_cache ] for obj in elements["Diagram"]: type_map[obj.uuid] = "Diagram" @@ -255,34 +260,6 @@ def _fix_components( elements["PhysicalComponent"] = components -def make_model_elements_index(ctx: dict[str, t.Any]) -> None: - """Create an elements index file for all migrated elements.""" - elements: list[dict[str, t.Any]] = [] - for obj in chain.from_iterable(ctx["ELEMENTS"].values()): - element_ = {"uuid": obj.uuid, "name": obj.name} - if pid := ctx["POLARION_ID_MAP"].get(obj.uuid): - element_["id"] = pid - - for role_id in ctx["ROLES"].get(type(obj).__name__, []): - attribute = getattr(obj, role_id, None) - if attribute is None: - continue - elif isinstance(attribute, common.ElementList): - refs = [ - ctx["POLARION_ID_MAP"].get(a.uuid, a.uuid) - for a in attribute - ] - if refs: - element_[role_id] = refs - else: - element_[role_id] = ctx["POLARION_ID_MAP"].get( - attribute.uuid, attribute.uuid - ) - elements.append(element_) - - ELEMENTS_IDX_PATH.write_text(yaml.dump(elements), encoding="utf8") - - from . import ( # pylint: disable=cyclic-import api_helper, element, diff --git a/capella2polarion/elements/element.py b/capella2polarion/elements/element.py index e533f8f1..1af2410a 100644 --- a/capella2polarion/elements/element.py +++ b/capella2polarion/elements/element.py @@ -6,6 +6,7 @@ import collections.abc as cabc import functools import logging +import pathlib import typing as t from collections import defaultdict from itertools import chain @@ -27,25 +28,28 @@ def create_work_items( - ctx: dict[str, t.Any] -) -> list[serialize.CapellaWorkItem]: + elements, + diagram_cache_path: pathlib.Path, + polarion_type_map, + model, + polarion_id_map, + descr_references, +) -> dict[str, serialize.CapellaWorkItem]: """Create a list of work items for Polarion.""" - objects = chain.from_iterable(ctx["ELEMENTS"].values()) + objects = chain.from_iterable(elements.values()) _work_items = [] - serializer: cabc.Callable[ - [diag.Diagram | common.GenericElement, dict[str, t.Any]], - serialize.CapellaWorkItem, - ] + serializer = serialize.CapellaWorkItemSerializer( + diagram_cache_path, + polarion_type_map, + model, + polarion_id_map, + descr_references, + ) for obj in objects: - if isinstance(obj, diag.Diagram): - serializer = serialize.diagram - else: - serializer = serialize.generic_work_item - - _work_items.append(serialize.element(obj, ctx, serializer)) + _work_items.append(serializer.serialize(obj)) _work_items = list(filter(None, _work_items)) - valid_types = set(map(helpers.resolve_element_type, set(ctx["ELEMENTS"]))) + valid_types = set(map(helpers.resolve_element_type, set(elements))) work_items: list[serialize.CapellaWorkItem] = [] missing_types: set[str] = set() for work_item in _work_items: @@ -60,62 +64,107 @@ def create_work_items( "%r are missing in the capella2polarion configuration", ", ".join(missing_types), ) - ctx["WORK_ITEMS"] = {wi.uuid_capella: wi for wi in work_items} - return work_items + return {wi.uuid_capella: wi for wi in work_items} def create_links( - obj: common.GenericElement | diag.Diagram, ctx: dict[str, t.Any] + obj: common.GenericElement | diag.Diagram, + polarion_id_map, + descr_references, + project_id, + model, + roles, ) -> list[polarion_api.WorkItemLink]: """Create work item links for a given Capella object.""" - custom_link_resolvers = CUSTOM_LINKS reverse_type_map = TYPES_POL2CAPELLA if isinstance(obj, diag.Diagram): repres = f"<Diagram {obj.name!r}>" else: repres = obj._short_repr_() - wid = ctx["POLARION_ID_MAP"][obj.uuid] + wid = polarion_id_map[obj.uuid] ptype = reverse_type_map.get(type(obj).__name__, type(obj).__name__) new_links: list[polarion_api.WorkItemLink] = [] - for role_id in ctx["ROLES"].get(ptype, []): - if resolver := custom_link_resolvers.get(role_id): - new_links.extend(resolver(ctx, obj, role_id, {})) - continue - - if (refs := getattr(obj, role_id, None)) is None: - logger.info( - "Unable to create work item link %r for [%s]. " - "There is no %r attribute on %s", - role_id, - wid, - role_id, - repres, + for role_id in roles.get(ptype, []): + if role_id == "description_reference": + new_links.extend( + _handle_description_reference_links( + polarion_id_map, + descr_references, + project_id, + model, + obj, + role_id, + {}, + ) + ) + elif role_id == "diagram_elements": + new_links.extend( + _handle_diagram_reference_links( + polarion_id_map, model, project_id, obj, role_id, {} + ) + ) + elif role_id == "input_exchanges": + new_links.extend( + _handle_exchanges( + polarion_id_map, + model, + project_id, + obj, + role_id, + {}, + "inputs", + ) + ) + elif role_id == "output_exchanges": + new_links.extend( + _handle_exchanges( + polarion_id_map, + model, + project_id, + obj, + role_id, + {}, + "outputs", + ) ) - continue - - if isinstance(refs, common.ElementList): - new: cabc.Iterable[str] = refs.by_uuid # type: ignore[assignment] else: - assert hasattr(refs, "uuid") - new = [refs.uuid] - - new = set(_get_work_item_ids(ctx, wid, new, role_id)) - new_links.extend(_create(ctx, wid, role_id, new, {})) + if (refs := getattr(obj, role_id, None)) is None: + logger.info( + "Unable to create work item link %r for [%s]. " + "There is no %r attribute on %s", + role_id, + wid, + role_id, + repres, + ) + continue + + if isinstance(refs, common.ElementList): + new: cabc.Iterable[str] = refs.by_uuid # type: ignore[assignment] + else: + assert hasattr(refs, "uuid") + new = [refs.uuid] + + new = set( + _get_work_item_ids(polarion_id_map, model, wid, new, role_id) + ) + new_links.extend(_create(project_id, wid, role_id, new, {})) return new_links def _get_work_item_ids( - ctx: dict[str, t.Any], + polarion_id_map, + model, primary_id: str, uuids: cabc.Iterable[str], role_id: str, ) -> cabc.Iterator[str]: for uuid in uuids: - if wid := ctx["POLARION_ID_MAP"].get(uuid): + if wid := polarion_id_map.get(uuid): yield wid else: - obj = ctx["MODEL"].by_uuid(uuid) + obj = model.by_uuid(uuid) logger.info( "Unable to create work item link %r for [%s]. " "Couldn't identify work item for %r", @@ -126,28 +175,35 @@ def _get_work_item_ids( def _handle_description_reference_links( - context: dict[str, t.Any], + polarion_id_map, + descr_references, + project_id, + model, obj: common.GenericElement, role_id: str, links: dict[str, polarion_api.WorkItemLink], ) -> list[polarion_api.WorkItemLink]: - refs = context["DESCR_REFERENCES"].get(obj.uuid, []) - wid = context["POLARION_ID_MAP"][obj.uuid] - refs = set(_get_work_item_ids(context, wid, refs, role_id)) - return _create(context, wid, role_id, refs, links) + refs = descr_references.get(obj.uuid, []) + wid = polarion_id_map[obj.uuid] + refs = set(_get_work_item_ids(polarion_id_map, model, wid, refs, role_id)) + return _create(project_id, wid, role_id, refs, links) def _handle_diagram_reference_links( - context: dict[str, t.Any], + polarion_id_map, + model, + project_id, obj: diag.Diagram, role_id: str, links: dict[str, polarion_api.WorkItemLink], ) -> list[polarion_api.WorkItemLink]: try: refs = set(_collect_uuids(obj.nodes)) - wid = context["POLARION_ID_MAP"][obj.uuid] - refs = set(_get_work_item_ids(context, wid, refs, role_id)) - ref_links = _create(context, wid, role_id, refs, links) + wid = polarion_id_map[obj.uuid] + refs = set( + _get_work_item_ids(polarion_id_map, model, wid, refs, role_id) + ) + ref_links = _create(project_id, wid, role_id, refs, links) except StopIteration: logger.exception( "Could not create links for diagram %r", obj._short_repr_() @@ -169,7 +225,7 @@ def _collect_uuids( def _create( - context: dict[str, t.Any], + project_id, primary_id: str, role_id: str, new: cabc.Iterable[str], @@ -181,7 +237,7 @@ def _create( primary_id, id, role_id, - secondary_work_item_project=context["PROJECT_ID"], + secondary_work_item_project=project_id, ) for id in new ] @@ -189,19 +245,21 @@ def _create( def _handle_exchanges( - context: dict[str, t.Any], + polarion_id_map, + model, + project_id, obj: fa.Function, role_id: str, links: dict[str, polarion_api.WorkItemLink], attr: str = "inputs", ) -> list[polarion_api.WorkItemLink]: - wid = context["POLARION_ID_MAP"][obj.uuid] + wid = polarion_id_map[obj.uuid] exchanges: list[str] = [] for element in getattr(obj, attr): uuids = element.exchanges.by_uuid - exs = _get_work_item_ids(context, wid, uuids, role_id) + exs = _get_work_item_ids(polarion_id_map, model, wid, uuids, role_id) exchanges.extend(set(exs)) - return _create(context, wid, role_id, exchanges, links) + return _create(project_id, wid, role_id, exchanges, links) def create_grouped_link_fields( @@ -288,20 +346,3 @@ def _create_link_fields( "type": "text/html", "value": _make_url_list(links, reverse), } - - -CustomLinkMaker = cabc.Callable[ - [ - dict[str, t.Any], - diag.Diagram | common.GenericElement, - str, - dict[str, t.Any], - ], - list[polarion_api.WorkItemLink], -] -CUSTOM_LINKS: dict[str, CustomLinkMaker] = { - "description_reference": _handle_description_reference_links, - "diagram_elements": _handle_diagram_reference_links, - "input_exchanges": functools.partial(_handle_exchanges, attr="inputs"), - "output_exchanges": functools.partial(_handle_exchanges, attr="outputs"), -} diff --git a/capella2polarion/elements/serialize.py b/capella2polarion/elements/serialize.py index b5361520..235c1533 100644 --- a/capella2polarion/elements/serialize.py +++ b/capella2polarion/elements/serialize.py @@ -12,6 +12,7 @@ import re import typing as t +import capellambse import markupsafe import polarion_rest_api_client as polarion_api from capellambse import helpers as chelpers @@ -35,6 +36,18 @@ "</span>" ) +SERIALIZERS: dict[str, str] = { + "CapabilityRealization": "include_pre_and_post_condition", + "Capability": "include_pre_and_post_condition", + "LogicalComponent": "component_or_actor", + "OperationalCapability": "include_pre_and_post_condition", + "PhysicalComponent": "physical_component", + "SystemComponent": "component_or_actor", + "Scenario": "include_pre_and_post_condition", + "Constraint": "constraint", +} + + PrePostConditionElement = t.Union[ oa.OperationalCapability, interaction.Scenario ] @@ -56,37 +69,6 @@ class Condition(t.TypedDict): postCondition: Condition | None -def element( - obj: diagr.Diagram | common.GenericElement, - ctx: dict[str, t.Any], - serializer: cabc.Callable[[t.Any, dict[str, t.Any]], CapellaWorkItem], -) -> CapellaWorkItem | None: - """Seralize a Capella element for the PolarionRestAPI.""" - try: - return serializer(obj, ctx) - except Exception as error: - logger.error("Serializing model element failed. %s", error.args[0]) - return None - - -def diagram(diag: diagr.Diagram, ctx: dict[str, t.Any]) -> CapellaWorkItem: - """Serialize a diagram for Polarion.""" - diagram_path = ctx["DIAGRAM_CACHE"] / f"{diag.uuid}.svg" - src = _decode_diagram(diagram_path) - style = "; ".join( - (f"{key}: {value}" for key, value in DIAGRAM_STYLES.items()) - ) - description = f'<html><p><img style="{style}" src="{src}" /></p></html>' - return CapellaWorkItem( - type="diagram", - title=diag.name, - description_type="text/html", - description=description, - status="open", - uuid_capella=diag.uuid, - ) - - def _decode_diagram(diagram_path: pathlib.Path) -> str: mime_type, _ = mimetypes.guess_type(diagram_path) if mime_type is None: @@ -103,32 +85,21 @@ def _decode_diagram(diagram_path: pathlib.Path) -> str: return src -def generic_work_item( - obj: common.GenericElement, ctx: dict[str, t.Any] -) -> CapellaWorkItem: - """Return a work item for the given model element.""" - xtype = ctx["POLARION_TYPE_MAP"].get(obj.uuid, type(obj).__name__) - serializer = SERIALIZERS.get(xtype, _generic_work_item) - return serializer(obj, ctx) - - -def _generic_work_item( - obj: common.GenericElement, ctx: dict[str, t.Any] -) -> CapellaWorkItem: - xtype = ctx["POLARION_TYPE_MAP"].get(obj.uuid, type(obj).__name__) - raw_description = getattr(obj, "description", markupsafe.Markup("")) - uuids, value = _sanitize_description(obj, raw_description, ctx) - ctx.setdefault("DESCR_REFERENCES", {})[obj.uuid] = uuids - requirement_types = _get_requirement_types_text(obj) - return CapellaWorkItem( - type=helpers.resolve_element_type(xtype), - title=obj.name, - description_type="text/html", - description=value, - status="open", - uuid_capella=obj.uuid, - **requirement_types, - ) +def _format_texts( + type_texts: dict[str, list[str]] +) -> dict[str, dict[str, str]]: + def _format(texts: list[str]) -> dict[str, str]: + if len(texts) > 1: + items = "".join(f"<li>{text}</li>" for text in texts) + text = f"<ul>{items}</ul>" + else: + text = texts[0] + return {"type": "text/html", "value": text} + + requirement_types = {} + for typ, texts in type_texts.items(): + requirement_types[typ.lower()] = _format(texts) + return requirement_types def _get_requirement_types_text( @@ -152,170 +123,222 @@ def _get_requirement_types_text( return _format_texts(type_texts) -def _format_texts( - type_texts: dict[str, list[str]] -) -> dict[str, dict[str, str]]: - def _format(texts: list[str]) -> dict[str, str]: - if len(texts) > 1: - items = "".join(f"<li>{text}</li>" for text in texts) - text = f"<ul>{items}</ul>" - else: - text = texts[0] - return {"type": "text/html", "value": text} - - requirement_types = {} - for typ, texts in type_texts.items(): - requirement_types[typ.lower()] = _format(texts) - return requirement_types +def _condition(html: bool, value: str) -> CapellaWorkItem.Condition: + _type = "text/html" if html else "text/plain" + return {"type": _type, "value": value} -def _sanitize_description( - obj: common.GenericElement, descr: markupsafe.Markup, ctx: dict[str, t.Any] -) -> tuple[list[str], markupsafe.Markup]: - referenced_uuids: list[str] = [] - replaced_markup = RE_DESCR_LINK_PATTERN.sub( - lambda match: replace_markup(match, ctx, referenced_uuids), descr - ) - - def repair_images(node: etree._Element) -> None: - if node.tag != "img": - return - - file_url = pathlib.PurePosixPath(node.get("src")) - workspace = file_url.parts[0] - file_path = pathlib.PurePosixPath(*file_url.parts[1:]) - mime_type, _ = mimetypes.guess_type(file_url) - resources = ctx["MODEL"]._loader.resources - filehandler = resources[["\x00", workspace][workspace in resources]] +class CapellaWorkItemSerializer: + """The general serializer class for CapellaWorkItems.""" + + diagram_cache_path: pathlib.Path + polarion_type_map: dict[str, str] + model: capellambse.MelodyModel + polarion_id_map: dict[str, str] + descr_references: dict[str, list[str]] + serializers: dict[ + str, cabc.Callable[[common.GenericElement], CapellaWorkItem] + ] + serializer_mapping: dict[str, str] + + def __init__( + self, + diagram_cache_path: pathlib.Path, + polarion_type_map: dict[str, str], + model: capellambse.MelodyModel, + polarion_id_map: dict[str, str], + descr_references: dict[str, list[str]], + serializer_mapping: dict[str, str] | None = None, + ): + self.diagram_cache_path = diagram_cache_path + self.polarion_type_map = polarion_type_map + self.model = model + self.polarion_id_map = polarion_id_map + self.descr_references = descr_references + self.serializers = { + "include_pre_and_post_condition": self.include_pre_and_post_condition, + "component_or_actor": self.component_or_actor, + "physical_component": self.physical_component, + "constraint": self.constraint, + } + self.serializer_mapping = serializer_mapping or SERIALIZERS + + def serialize( + self, obj: diagr.Diagram | common.GenericElement + ) -> CapellaWorkItem | None: + """Return a CapellaWorkItem for the given diagram or element.""" try: - with filehandler.open(file_path, "r") as img: - b64_img = base64.b64encode(img.read()).decode("utf8") - node.attrib["src"] = f"data:{mime_type};base64,{b64_img}" - except FileNotFoundError: - logger.error( - "Inline image can't be found from %r for %r", - file_path, - obj._short_repr_(), - ) + if isinstance(obj, diagr.Diagram): + return self.diagram(obj) + else: + xtype = self.polarion_type_map.get( + obj.uuid, type(obj).__name__ + ) + serializer = self.serializers.get( + self.serializer_mapping.get(xtype, ""), + self._generic_work_item, + ) + return serializer(obj) + except Exception as error: + logger.error("Serializing model element failed. %s", error.args[0]) + return None + + def diagram(self, diag: diagr.Diagram) -> CapellaWorkItem: + """Serialize a diagram for Polarion.""" + diagram_path = self.diagram_cache_path / f"{diag.uuid}.svg" + src = _decode_diagram(diagram_path) + style = "; ".join( + (f"{key}: {value}" for key, value in DIAGRAM_STYLES.items()) + ) + description = ( + f'<html><p><img style="{style}" src="{src}" /></p></html>' + ) + return CapellaWorkItem( + type="diagram", + title=diag.name, + description_type="text/html", + description=description, + status="open", + uuid_capella=diag.uuid, + ) - repaired_markup = chelpers.process_html_fragments( - replaced_markup, repair_images - ) - return referenced_uuids, repaired_markup - - -def replace_markup( - match: re.Match, - ctx: dict[str, t.Any], - referenced_uuids: list[str], - non_matcher: cabc.Callable[[str], str] = lambda i: i, -) -> str: - """Replace UUID references in a ``match`` with a work item link. - - If the UUID doesn't correspond to an existing work item the original - text is returned. - """ - uuid = match.group(1) - if pid := ctx["POLARION_ID_MAP"].get(uuid): - referenced_uuids.append(uuid) - return POLARION_WORK_ITEM_URL.format(pid=pid) - return non_matcher(match.group(0)) - - -def include_pre_and_post_condition( - obj: PrePostConditionElement, ctx: dict[str, t.Any] -) -> CapellaWorkItem: - """Return generic attributes and pre- plus post-condition.""" - - def get_condition(cap: PrePostConditionElement, name: str) -> str: - if not (condition := getattr(cap, name)): - return "" - return condition.specification["capella:linkedText"].striptags() - - def strike_through(string: str) -> str: - if match := RE_DESCR_DELETED_PATTERN.match(string): - string = match.group(1) - return f'<span style="text-decoration: line-through;">{string}</span>' - - def matcher(match: re.Match) -> str: - return strike_through(replace_markup(match, ctx, [])) - - work_item = _generic_work_item(obj, ctx) - pre_condition = RE_DESCR_DELETED_PATTERN.sub( - matcher, get_condition(obj, "precondition") - ) - post_condition = RE_DESCR_DELETED_PATTERN.sub( - matcher, get_condition(obj, "postcondition") - ) - - work_item.preCondition = _condition(True, pre_condition) - work_item.postCondition = _condition(True, post_condition) - - return work_item - - -def get_linked_text( - obj: capellacore.Constraint, ctx: dict[str, t.Any] -) -> markupsafe.Markup: - """Return sanitized markup of the given ``obj`` linked text.""" - description = obj.specification["capella:linkedText"].striptags() - uuids, value = _sanitize_description(obj, description, ctx) - if uuids: - ctx.setdefault("DESCR_REFERENCES", {})[obj.uuid] = uuids - return value - - -def constraint( - obj: capellacore.Constraint, ctx: dict[str, t.Any] -) -> CapellaWorkItem: - """Return attributes for a ``Constraint``.""" - work_item = _generic_work_item(obj, ctx) - # pylint: disable-next=attribute-defined-outside-init - work_item.description = get_linked_text(obj, ctx) - return work_item + def _generic_work_item( + self, obj: common.GenericElement + ) -> CapellaWorkItem: + xtype = self.polarion_type_map.get(obj.uuid, type(obj).__name__) + raw_description = getattr(obj, "description", markupsafe.Markup("")) + uuids, value = self._sanitize_description(obj, raw_description) + self.descr_references[obj.uuid] = uuids + requirement_types = _get_requirement_types_text(obj) + return CapellaWorkItem( + type=helpers.resolve_element_type(xtype), + title=obj.name, + description_type="text/html", + description=value, + status="open", + uuid_capella=obj.uuid, + **requirement_types, + ) + def _sanitize_description( + self, obj: common.GenericElement, descr: markupsafe.Markup + ) -> tuple[list[str], markupsafe.Markup]: + referenced_uuids: list[str] = [] + replaced_markup = RE_DESCR_LINK_PATTERN.sub( + lambda match: self.replace_markup(match, referenced_uuids), descr + ) -def _condition(html: bool, value: str) -> CapellaWorkItem.Condition: - _type = "text/html" if html else "text/plain" - return {"type": _type, "value": value} + def repair_images(node: etree._Element) -> None: + if node.tag != "img": + return + + file_url = pathlib.PurePosixPath(node.get("src")) + workspace = file_url.parts[0] + file_path = pathlib.PurePosixPath(*file_url.parts[1:]) + mime_type, _ = mimetypes.guess_type(file_url) + resources = self.model.resources + filehandler = resources[ + ["\x00", workspace][workspace in resources] + ] + try: + with filehandler.open(file_path, "r") as img: + b64_img = base64.b64encode(img.read()).decode("utf8") + node.attrib["src"] = f"data:{mime_type};base64,{b64_img}" + except FileNotFoundError: + logger.error( + "Inline image can't be found from %r for %r", + file_path, + obj._short_repr_(), + ) + + repaired_markup = chelpers.process_html_fragments( + replaced_markup, repair_images + ) + return referenced_uuids, repaired_markup + + def replace_markup( + self, + match: re.Match, + referenced_uuids: list[str], + non_matcher: cabc.Callable[[str], str] = lambda i: i, + ) -> str: + """Replace UUID references in a ``match`` with a work item link. + + If the UUID doesn't correspond to an existing work item the + original text is returned. + """ + uuid = match.group(1) + if pid := self.polarion_id_map.get(uuid): + referenced_uuids.append(uuid) + return POLARION_WORK_ITEM_URL.format(pid=pid) + return non_matcher(match.group(0)) + + def include_pre_and_post_condition( + self, obj: PrePostConditionElement + ) -> CapellaWorkItem: + """Return generic attributes and pre- and post-condition.""" + + def get_condition(cap: PrePostConditionElement, name: str) -> str: + if not (condition := getattr(cap, name)): + return "" + return condition.specification["capella:linkedText"].striptags() + + def strike_through(string: str) -> str: + if match := RE_DESCR_DELETED_PATTERN.match(string): + string = match.group(1) + return ( + f'<span style="text-decoration: line-through;">{string}</span>' + ) + def matcher(match: re.Match) -> str: + return strike_through(self.replace_markup(match, [])) -def component_or_actor( - obj: cs.Component, ctx: dict[str, t.Any] -) -> CapellaWorkItem: - """Return attributes for a ``Component``.""" - work_item = _generic_work_item(obj, ctx) - if obj.is_actor: - xtype = RE_CAMEL_CASE_2ND_WORD_PATTERN.sub( - r"\1Actor", type(obj).__name__ + work_item = self._generic_work_item(obj) + pre_condition = RE_DESCR_DELETED_PATTERN.sub( + matcher, get_condition(obj, "precondition") + ) + post_condition = RE_DESCR_DELETED_PATTERN.sub( + matcher, get_condition(obj, "postcondition") ) - # pylint: disable-next=attribute-defined-outside-init - work_item.type = helpers.resolve_element_type(xtype) - return work_item + work_item.preCondition = _condition(True, pre_condition) + work_item.postCondition = _condition(True, post_condition) -def physical_component( - obj: pa.PhysicalComponent, ctx: dict[str, t.Any] -) -> CapellaWorkItem: - """Return attributes for a ``PhysicalComponent``.""" - work_item = component_or_actor(obj, ctx) - xtype = work_item.type - if obj.nature is not None: - # pylint: disable-next=attribute-defined-outside-init - work_item.type = f"{xtype}{obj.nature.name.capitalize()}" - return work_item + return work_item + def get_linked_text( + self, obj: capellacore.Constraint + ) -> markupsafe.Markup: + """Return sanitized markup of the given ``obj`` linked text.""" + description = obj.specification["capella:linkedText"].striptags() + uuids, value = self._sanitize_description(obj, description) + if uuids: + self.descr_references[obj.uuid] = uuids + return value -Serializer = cabc.Callable[ - [common.GenericElement, dict[str, t.Any]], CapellaWorkItem -] -SERIALIZERS: dict[str, Serializer] = { - "CapabilityRealization": include_pre_and_post_condition, - "LogicalComponent": component_or_actor, - "OperationalCapability": include_pre_and_post_condition, - "Capability": include_pre_and_post_condition, - "PhysicalComponent": physical_component, - "SystemComponent": component_or_actor, - "Scenario": include_pre_and_post_condition, - "Constraint": constraint, -} + def constraint(self, obj: capellacore.Constraint) -> CapellaWorkItem: + """Return attributes for a ``Constraint``.""" + work_item = self._generic_work_item(obj) + # pylint: disable-next=attribute-defined-outside-init + work_item.description = self.get_linked_text(obj) + return work_item + + def component_or_actor(self, obj: cs.Component) -> CapellaWorkItem: + """Return attributes for a ``Component``.""" + work_item = self._generic_work_item(obj) + if obj.is_actor: + xtype = RE_CAMEL_CASE_2ND_WORD_PATTERN.sub( + r"\1Actor", type(obj).__name__ + ) + # pylint: disable-next=attribute-defined-outside-init + work_item.type = helpers.resolve_element_type(xtype) + return work_item + + def physical_component(self, obj: pa.PhysicalComponent) -> CapellaWorkItem: + """Return attributes for a ``PhysicalComponent``.""" + work_item = self.component_or_actor(obj) + xtype = work_item.type + if obj.nature is not None: + # pylint: disable-next=attribute-defined-outside-init + work_item.type = f"{xtype}{obj.nature.name.capitalize()}" + return work_item diff --git a/tests/test_cli.py b/tests/test_cli.py index 0b752e17..cb7586d4 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -89,4 +89,3 @@ def test_migrate_model_elements(monkeypatch: pytest.MonkeyPatch): assert mock_delete_work_items.call_count == 1 assert mock_patch_work_items.call_count == 1 assert mock_post_work_items.call_count == 1 - assert ELEMENTS_IDX_PATH.exists() diff --git a/tests/test_elements.py b/tests/test_elements.py index c9ef64e8..fbdc707a 100644 --- a/tests/test_elements.py +++ b/tests/test_elements.py @@ -4,6 +4,7 @@ from __future__ import annotations import logging +import pathlib import typing as t from unittest import mock @@ -56,7 +57,7 @@ "type": "diagram", "status": "open", "additional_attributes": { - "uuid_capella": "_APMboAPhEeynfbzU12yy7w", + "uuid_capella": TEST_DIAG_UUID, }, } TEST_WI_CHECKSUM = ( @@ -123,7 +124,7 @@ class TestDiagramElements: @staticmethod @pytest.fixture - def context( + def ctx( diagram_cache_index: list[dict[str, t.Any]], model: capellambse.MelodyModel, ) -> dict[str, t.Any]: @@ -143,13 +144,20 @@ def context( } @staticmethod - def test_create_diagrams(context: dict[str, t.Any]): - context["ELEMENTS"] = {"Diagram": context["MODEL"].diagrams} - - diagrams = element.create_work_items(context) + def test_create_diagrams(ctx: dict[str, t.Any]): + ctx["ELEMENTS"] = {"Diagram": ctx["MODEL"].diagrams} + + diagrams = element.create_work_items( + ctx["ELEMENTS"], + ctx["DIAGRAM_CACHE"], + {}, + ctx["MODEL"], + ctx["POLARION_ID_MAP"], + {}, + ) assert len(diagrams) == 1 - work_item = diagrams[0] + work_item = diagrams[TEST_DIAG_UUID] work_item.calculate_checksum() assert isinstance(work_item, serialize.CapellaWorkItem) assert { @@ -165,25 +173,34 @@ def test_create_diagrams(context: dict[str, t.Any]): @staticmethod def test_create_diagrams_filters_non_diagram_elements( - monkeypatch: pytest.MonkeyPatch, context: dict[str, t.Any] + ctx: dict[str, t.Any] ): - context["ELEMENTS"] = {"Diagram": context["MODEL"].diagrams} - attributes = mock.MagicMock() - attributes.return_value = None - monkeypatch.setattr(serialize, "element", attributes) - - element.create_work_items(context) + ctx["ELEMENTS"] = {"Diagram": ctx["MODEL"].diagrams} + + element.create_work_items( + ctx["ELEMENTS"], + ctx["DIAGRAM_CACHE"], + {}, + ctx["MODEL"], + ctx["POLARION_ID_MAP"], + {}, + ) - assert context["API"].create_work_items.call_count == 0 + assert ctx["API"].create_work_items.call_count == 0 @staticmethod - def test_delete_diagrams(context: dict[str, t.Any]): - context["CAPELLA_UUIDS"] = [] - - elements.delete_work_items(context) + def test_delete_diagrams(ctx: dict[str, t.Any]): + ctx["CAPELLA_UUIDS"] = set() + + elements.delete_work_items( + ctx["POLARION_ID_MAP"], + ctx["POLARION_WI_MAP"], + ctx["CAPELLA_UUIDS"], + ctx["API"], + ) - assert context["API"].delete_work_items.call_count == 1 - assert context["API"].delete_work_items.call_args[0][0] == ["Diag-1"] + assert ctx["API"].delete_work_items.call_count == 1 + assert ctx["API"].delete_work_items.call_args[0][0] == ["Diag-1"] class FakeModelObject: @@ -216,7 +233,7 @@ class UnsupportedFakeModelObject(FakeModelObject): class TestModelElements: @staticmethod @pytest.fixture - def context(model: capellambse.MelodyModel) -> dict[str, t.Any]: + def ctx(model: capellambse.MelodyModel) -> dict[str, t.Any]: api = mock.MagicMock(spec=polarion_api.OpenAPIPolarionProjectClient) fake = FakeModelObject("uuid1", name="Fake 1") work_item = serialize.CapellaWorkItem( @@ -225,6 +242,7 @@ def context(model: capellambse.MelodyModel) -> dict[str, t.Any]: return { "API": api, "PROJECT_ID": "project_id", + "DIAGRAM_CACHE": pathlib.Path(""), "ELEMENTS": { "FakeModelObject": [ fake, @@ -245,14 +263,14 @@ def context(model: capellambse.MelodyModel) -> dict[str, t.Any]: @staticmethod def test_create_work_items( - monkeypatch: pytest.MonkeyPatch, context: dict[str, t.Any] + monkeypatch: pytest.MonkeyPatch, ctx: dict[str, t.Any] ): - del context["ELEMENTS"]["UnsupportedFakeModelObject"] - context["MODEL"] = model = mock.MagicMock() - model.by_uuid.side_effect = context["ELEMENTS"]["FakeModelObject"] + del ctx["ELEMENTS"]["UnsupportedFakeModelObject"] + ctx["MODEL"] = model = mock.MagicMock() + model.by_uuid.side_effect = ctx["ELEMENTS"]["FakeModelObject"] monkeypatch.setattr( - serialize, - "generic_work_item", + serialize.CapellaWorkItemSerializer, + "serialize", mock_generic_work_item := mock.MagicMock(), ) mock_generic_work_item.side_effect = [ @@ -272,16 +290,23 @@ def test_create_work_items( ), ] - work_items = element.create_work_items(context) + work_items = element.create_work_items( + ctx["ELEMENTS"], + ctx["DIAGRAM_CACHE"], + ctx["POLARION_TYPE_MAP"], + ctx["MODEL"], + ctx["POLARION_ID_MAP"], + {}, + ) - assert work_items == [expected, expected1] + assert list(work_items.values()) == [expected, expected1] @staticmethod - def test_create_links_custom_resolver(context: dict[str, t.Any]): - obj = context["ELEMENTS"]["FakeModelObject"][1] - context["POLARION_ID_MAP"]["uuid2"] = "Obj-2" - context["ROLES"] = {"FakeModelObject": ["description_reference"]} - context["DESCR_REFERENCES"] = {"uuid2": ["uuid1"]} + def test_create_links_custom_resolver(ctx: dict[str, t.Any]): + obj = ctx["ELEMENTS"]["FakeModelObject"][1] + ctx["POLARION_ID_MAP"]["uuid2"] = "Obj-2" + ctx["ROLES"] = {"FakeModelObject": ["description_reference"]} + ctx["DESCR_REFERENCES"] = {"uuid2": ["uuid1"]} expected = polarion_api.WorkItemLink( "Obj-2", "Obj-1", @@ -289,19 +314,26 @@ def test_create_links_custom_resolver(context: dict[str, t.Any]): secondary_work_item_project="project_id", ) - links = element.create_links(obj, context) + links = element.create_links( + obj, + ctx["POLARION_ID_MAP"], + ctx["DESCR_REFERENCES"], + ctx["PROJECT_ID"], + ctx["MODEL"], + ctx["ROLES"], + ) assert links == [expected] @staticmethod - def test_create_links_custom_exchanges_resolver(context: dict[str, t.Any]): + def test_create_links_custom_exchanges_resolver(ctx: dict[str, t.Any]): function_uuid = "ceffa011-7b66-4b3c-9885-8e075e312ffa" - obj = context["MODEL"].by_uuid(function_uuid) - context["POLARION_ID_MAP"][function_uuid] = "Obj-1" - context["POLARION_ID_MAP"][ + obj = ctx["MODEL"].by_uuid(function_uuid) + ctx["POLARION_ID_MAP"][function_uuid] = "Obj-1" + ctx["POLARION_ID_MAP"][ "1a414995-f4cd-488c-8152-486e459fb9de" ] = "Obj-2" - context["ROLES"] = {"SystemFunction": ["input_exchanges"]} + ctx["ROLES"] = {"SystemFunction": ["input_exchanges"]} expected = polarion_api.WorkItemLink( "Obj-1", "Obj-2", @@ -309,15 +341,22 @@ def test_create_links_custom_exchanges_resolver(context: dict[str, t.Any]): secondary_work_item_project="project_id", ) - links = element.create_links(obj, context) + links = element.create_links( + obj, + ctx["POLARION_ID_MAP"], + {}, + ctx["PROJECT_ID"], + ctx["MODEL"], + ctx["ROLES"], + ) assert links == [expected] @staticmethod def test_create_links_missing_attribute( - context: dict[str, t.Any], caplog: pytest.LogCaptureFixture + ctx: dict[str, t.Any], caplog: pytest.LogCaptureFixture ): - obj = context["ELEMENTS"]["FakeModelObject"][0] + obj = ctx["ELEMENTS"]["FakeModelObject"][0] expected = ( "Unable to create work item link 'attribute' for [Obj-1]. " "There is no 'attribute' attribute on " @@ -325,26 +364,31 @@ def test_create_links_missing_attribute( ) with caplog.at_level(logging.DEBUG): - links = element.create_links(obj, context) + links = element.create_links( + obj, + ctx["POLARION_ID_MAP"], + {}, + ctx["PROJECT_ID"], + ctx["MODEL"], + ctx["ROLES"], + ) assert not links assert caplog.messages[0] == expected @staticmethod - def test_create_links_from_ElementList(context: dict[str, t.Any]): + def test_create_links_from_ElementList(ctx: dict[str, t.Any]): fake = FakeModelObject("uuid4", name="Fake 4") fake1 = FakeModelObject("uuid5", name="Fake 5") obj = FakeModelObject( "uuid6", name="Fake 6", attribute=common.ElementList( - context["MODEL"], [fake, fake1], FakeModelObject + ctx["MODEL"], [fake, fake1], FakeModelObject ), ) - context["ELEMENTS"]["FakeModelObject"].append(obj) - context["POLARION_ID_MAP"] |= { - f"uuid{i}": f"Obj-{i}" for i in range(4, 7) - } + ctx["ELEMENTS"]["FakeModelObject"].append(obj) + ctx["POLARION_ID_MAP"] |= {f"uuid{i}": f"Obj-{i}" for i in range(4, 7)} expected_link = polarion_api.WorkItemLink( "Obj-6", "Obj-5", @@ -358,15 +402,22 @@ def test_create_links_from_ElementList(context: dict[str, t.Any]): secondary_work_item_project="project_id", ) - links = element.create_links(obj, context) # type: ignore[arg-type] + links = element.create_links( + obj, + ctx["POLARION_ID_MAP"], + {}, + ctx["PROJECT_ID"], + ctx["MODEL"], + ctx["ROLES"], + ) # type: ignore[arg-type] assert expected_link in links assert expected_link1 in links @staticmethod - def test_create_link_from_single_attribute(context: dict[str, t.Any]): - obj = context["ELEMENTS"]["FakeModelObject"][1] - context["POLARION_ID_MAP"]["uuid2"] = "Obj-2" + def test_create_link_from_single_attribute(ctx: dict[str, t.Any]): + obj = ctx["ELEMENTS"]["FakeModelObject"][1] + ctx["POLARION_ID_MAP"]["uuid2"] = "Obj-2" expected = polarion_api.WorkItemLink( "Obj-2", "Obj-1", @@ -374,15 +425,22 @@ def test_create_link_from_single_attribute(context: dict[str, t.Any]): secondary_work_item_project="project_id", ) - links = element.create_links(obj, context) + links = element.create_links( + obj, + ctx["POLARION_ID_MAP"], + {}, + ctx["PROJECT_ID"], + ctx["MODEL"], + ctx["ROLES"], + ) assert links == [expected] @staticmethod def test_update_work_items( - monkeypatch: pytest.MonkeyPatch, context: dict[str, t.Any] + monkeypatch: pytest.MonkeyPatch, ctx: dict[str, t.Any] ): - context["POLARION_WI_MAP"]["uuid1"] = serialize.CapellaWorkItem( + ctx["POLARION_WI_MAP"]["uuid1"] = serialize.CapellaWorkItem( id="Obj-1", type="type", uuid_capella="uuid1", @@ -396,8 +454,8 @@ def test_update_work_items( monkeypatch.setattr( elements, "get_polarion_wi_map", mock_get_polarion_wi_map ) - mock_get_polarion_wi_map.return_value = context["POLARION_WI_MAP"] - context["WORK_ITEMS"] = { + mock_get_polarion_wi_map.return_value = ctx["POLARION_WI_MAP"] + ctx["WORK_ITEMS"] = { "uuid1": serialize.CapellaWorkItem( id="Obj-1", uuid_capella="uuid1", @@ -406,18 +464,25 @@ def test_update_work_items( description=markupsafe.Markup(""), ) } - context["MODEL"] = mock_model = mock.MagicMock() - mock_model.by_uuid.return_value = context["ELEMENTS"][ - "FakeModelObject" - ][0] - - elements.patch_work_items(context) - - assert context["API"].get_all_work_item_links.call_count == 1 - assert context["API"].delete_work_item_links.call_count == 0 - assert context["API"].create_work_item_links.call_count == 0 - assert context["API"].update_work_item.call_count == 1 - work_item = context["API"].update_work_item.call_args[0][0] + ctx["MODEL"] = mock_model = mock.MagicMock() + mock_model.by_uuid.return_value = ctx["ELEMENTS"]["FakeModelObject"][0] + + elements.patch_work_items( + ctx["POLARION_ID_MAP"], + ctx["MODEL"], + ctx["WORK_ITEMS"], + ctx["POLARION_WI_MAP"], + ctx["API"], + {}, + ctx["PROJECT_ID"], + ctx["ROLES"], + ) + + assert ctx["API"].get_all_work_item_links.call_count == 1 + assert ctx["API"].delete_work_item_links.call_count == 0 + assert ctx["API"].create_work_item_links.call_count == 0 + assert ctx["API"].update_work_item.call_count == 1 + work_item = ctx["API"].update_work_item.call_args[0][0] assert isinstance(work_item, serialize.CapellaWorkItem) assert work_item.id == "Obj-1" assert work_item.title == "Fake 1" @@ -429,36 +494,54 @@ def test_update_work_items( @staticmethod def test_update_work_items_filters_work_items_with_same_checksum( - context: dict[str, t.Any] + ctx: dict[str, t.Any] ): - context["POLARION_WI_MAP"]["uuid1"] = serialize.CapellaWorkItem( + ctx["POLARION_WI_MAP"]["uuid1"] = serialize.CapellaWorkItem( checksum=TEST_WI_CHECKSUM, ) - elements.patch_work_items(context) + elements.patch_work_items( + ctx["POLARION_ID_MAP"], + ctx["MODEL"], + ctx["WORK_ITEMS"], + ctx["POLARION_WI_MAP"], + ctx["API"], + {}, + ctx["PROJECT_ID"], + ctx["ROLES"], + ) - assert context["API"].update_work_item.call_count == 0 + assert ctx["API"].update_work_item.call_count == 0 @staticmethod - def test_update_links_with_no_elements(context: dict[str, t.Any]): - context["POLARION_WI_MAP"] = {} - - elements.patch_work_items(context) + def test_update_links_with_no_elements(ctx: dict[str, t.Any]): + ctx["POLARION_WI_MAP"] = {} + + elements.patch_work_items( + ctx["POLARION_ID_MAP"], + ctx["MODEL"], + ctx["WORK_ITEMS"], + ctx["POLARION_WI_MAP"], + ctx["API"], + {}, + ctx["PROJECT_ID"], + ctx["ROLES"], + ) - assert context["API"].get_all_work_item_links.call_count == 0 + assert ctx["API"].get_all_work_item_links.call_count == 0 @staticmethod def test_update_links( - monkeypatch: pytest.MonkeyPatch, context: dict[str, t.Any] + monkeypatch: pytest.MonkeyPatch, ctx: dict[str, t.Any] ): link = polarion_api.WorkItemLink( "Obj-1", "Obj-2", "attribute", True, "project_id" ) - context["POLARION_WI_MAP"]["uuid1"].linked_work_items = [link] - context["POLARION_WI_MAP"]["uuid2"] = serialize.CapellaWorkItem( + ctx["POLARION_WI_MAP"]["uuid1"].linked_work_items = [link] + ctx["POLARION_WI_MAP"]["uuid2"] = serialize.CapellaWorkItem( id="Obj-2", uuid_capella="uuid2", status="open" ) - context["WORK_ITEMS"] = { + ctx["WORK_ITEMS"] = { "uuid1": serialize.CapellaWorkItem( id="Obj-1", uuid_capella="uuid1", status="open" ), @@ -470,37 +553,46 @@ def test_update_links( monkeypatch.setattr( elements, "get_polarion_wi_map", mock_get_polarion_wi_map ) - mock_get_polarion_wi_map.return_value = context["POLARION_WI_MAP"] - context["API"].get_all_work_item_links.side_effect = ( + mock_get_polarion_wi_map.return_value = ctx["POLARION_WI_MAP"] + ctx["API"].get_all_work_item_links.side_effect = ( [link], [], ) - context["MODEL"] = mock_model = mock.MagicMock() - mock_model.by_uuid.side_effect = context["ELEMENTS"]["FakeModelObject"] + ctx["MODEL"] = mock_model = mock.MagicMock() + mock_model.by_uuid.side_effect = ctx["ELEMENTS"]["FakeModelObject"] expected_new_link = polarion_api.WorkItemLink( "Obj-2", "Obj-1", "attribute", None, "project_id" ) - elements.patch_work_items(context) + elements.patch_work_items( + ctx["POLARION_ID_MAP"], + ctx["MODEL"], + ctx["WORK_ITEMS"], + ctx["POLARION_WI_MAP"], + ctx["API"], + {}, + ctx["PROJECT_ID"], + ctx["ROLES"], + ) - links = context["API"].get_all_work_item_links.call_args_list - assert context["API"].get_all_work_item_links.call_count == 2 + links = ctx["API"].get_all_work_item_links.call_args_list + assert ctx["API"].get_all_work_item_links.call_count == 2 assert [links[0][0][0], links[1][0][0]] == ["Obj-1", "Obj-2"] - new_links = context["API"].create_work_item_links.call_args[0][0] - assert context["API"].create_work_item_links.call_count == 1 + new_links = ctx["API"].create_work_item_links.call_args[0][0] + assert ctx["API"].create_work_item_links.call_count == 1 assert new_links == [expected_new_link] - assert context["API"].delete_work_item_links.call_count == 1 - assert context["API"].delete_work_item_links.call_args[0][0] == [link] + assert ctx["API"].delete_work_item_links.call_count == 1 + assert ctx["API"].delete_work_item_links.call_args[0][0] == [link] @staticmethod def test_patch_work_item_grouped_links( monkeypatch: pytest.MonkeyPatch, - context: dict[str, t.Any], + ctx: dict[str, t.Any], dummy_work_items, ): - context["WORK_ITEMS"] = dummy_work_items + ctx["WORK_ITEMS"] = dummy_work_items - context["POLARION_WI_MAP"] = { + ctx["POLARION_WI_MAP"] = { "uuid0": serialize.CapellaWorkItem( id="Obj-0", uuid_capella="uuid0", status="open" ), @@ -513,7 +605,7 @@ def test_patch_work_item_grouped_links( } mock_create_links = mock.MagicMock() monkeypatch.setattr(element, "create_links", mock_create_links) - mock_create_links.side_effect = lambda obj, ctx: dummy_work_items[ + mock_create_links.side_effect = lambda obj, *args: dummy_work_items[ obj.uuid ].linked_work_items @@ -533,14 +625,23 @@ def mock_back_link(work_item, back_links): mock_grouped_links_reverse, ) - context["MODEL"] = mock_model = mock.MagicMock() + ctx["MODEL"] = mock_model = mock.MagicMock() mock_model.by_uuid.side_effect = [ FakeModelObject(f"uuid{i}", name=f"Fake {i}") for i in range(3) ] - elements.patch_work_items(context) + elements.patch_work_items( + ctx["POLARION_ID_MAP"], + ctx["MODEL"], + ctx["WORK_ITEMS"], + ctx["POLARION_WI_MAP"], + ctx["API"], + {}, + ctx["PROJECT_ID"], + ctx["ROLES"], + ) - update_work_item_calls = context["API"].update_work_item.call_args_list + update_work_item_calls = ctx["API"].update_work_item.call_args_list assert len(update_work_item_calls) == 3 mock_grouped_links_calls = mock_grouped_links.call_args_list @@ -666,9 +767,11 @@ class TestSerializers: def test_diagram(model: capellambse.MelodyModel): diag = model.diagrams.by_uuid(TEST_DIAG_UUID) - serialized_diagram = serialize.diagram( - diag, {"DIAGRAM_CACHE": TEST_DIAGRAM_CACHE} + serializer = serialize.CapellaWorkItemSerializer( + TEST_DIAGRAM_CACHE, {}, model, {}, {} ) + + serialized_diagram = serializer.serialize(diag) serialized_diagram.description = None assert serialized_diagram == serialize.CapellaWorkItem( @@ -819,13 +922,11 @@ def test_generic_work_item( ): obj = model.by_uuid(uuid) - work_item = serialize.generic_work_item( - obj, - { - "POLARION_ID_MAP": TEST_POL_ID_MAP, - "POLARION_TYPE_MAP": TEST_POL_TYPE_MAP, - }, + serializer = serialize.CapellaWorkItemSerializer( + pathlib.Path(""), TEST_POL_TYPE_MAP, model, TEST_POL_ID_MAP, {} ) + + work_item = serializer.serialize(obj) status = work_item.status work_item.status = None