Skip to content

Commit

Permalink
refactor(converter): Apply changes from code review
Browse files Browse the repository at this point in the history
  • Loading branch information
ewuerger committed Feb 1, 2024
1 parent a288dbd commit 62d57e7
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 50 deletions.
62 changes: 34 additions & 28 deletions capella2polarion/converters/element_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,26 +154,25 @@ def serialize_all(self) -> list[data_models.CapellaWorkItem]:
def serialize(self, uuid: str) -> data_models.CapellaWorkItem | None:
"""Return a CapellaWorkItem for the given diagram or element."""
converter_data = self.converter_session[uuid]
work_item = data_models.CapellaWorkItem(uuid)
nothing = True
self._generic_work_item(converter_data)

for converter in converter_data.type_config.converters or []:
try:
serializer: cabc.Callable[
[data_session.ConverterData], data_models.CapellaWorkItem
] = getattr(self, f"_{converter}", self._generic_work_item)
converter_data.work_item = serializer(converter_data)
work_item += converter_data.work_item
nothing = False
] = getattr(self, f"_{converter}")
serializer(converter_data)
except Exception as error:
logger.error(
"Serializing model element failed. %s", error.args[0]
)
converter_data.work_item = None
return None # Force to not overwrite on failure

assert converter_data.work_item is not None
old = self.capella_polarion_mapping.get_work_item_by_capella_uuid(uuid)
if not nothing and old:
work_item.id = old.id
return work_item
if old:
converter_data.work_item.id = old.id
return converter_data.work_item

def _diagram(
self, converter_data: data_session.ConverterData
Expand All @@ -183,24 +182,27 @@ def _diagram(
diagram_path = self.diagram_cache_path / f"{diag.uuid}.svg"
src = _decode_diagram(diagram_path)
description = _generate_image_html(src)
return data_models.CapellaWorkItem(
converter_data.work_item = data_models.CapellaWorkItem(
type=converter_data.type_config.p_type,
title=diag.name,
description_type="text/html",
description=description,
status="open",
uuid_capella=diag.uuid,
)
return converter_data.work_item

def _generic_work_item(
self, converter_data: data_session.ConverterData
) -> data_models.CapellaWorkItem:
obj = converter_data.capella_element
raw_description = getattr(obj, "description", markupsafe.Markup(""))
uuids, value = self._sanitize_description(obj, raw_description)
raw_description = getattr(obj, "description", None)
uuids, value = self._sanitize_description(
obj, raw_description or markupsafe.Markup("")
)
converter_data.description_references = uuids
requirement_types = _get_requirement_types_text(obj)
return data_models.CapellaWorkItem(
converter_data.work_item = data_models.CapellaWorkItem(
type=converter_data.type_config.p_type,
title=obj.name,
description_type="text/html",
Expand All @@ -209,6 +211,7 @@ def _generic_work_item(
uuid_capella=obj.uuid,
**requirement_types,
)
return converter_data.work_item

def _sanitize_description(
self, obj: common.GenericElement, descr: markupsafe.Markup
Expand Down Expand Up @@ -286,18 +289,19 @@ def get_condition(cap: PrePostConditionElement, name: str) -> str:
def matcher(match: re.Match) -> str:
return strike_through(self._replace_markup(match, []))

work_item = self._generic_work_item(converter_data)
pre_condition = RE_DESCR_DELETED_PATTERN.sub(
matcher, get_condition(obj, "precondition")
)
post_condition = RE_DESCR_DELETED_PATTERN.sub(
matcher, get_condition(obj, "postcondition")
)

work_item.preCondition = _condition(True, pre_condition)
work_item.postCondition = _condition(True, post_condition)

return work_item
assert converter_data.work_item, "No work item set yet"
converter_data.work_item.preCondition = _condition(True, pre_condition)
converter_data.work_item.postCondition = _condition(
True, post_condition
)
return converter_data.work_item

def _get_linked_text(
self, converter_data: data_session.ConverterData
Expand All @@ -317,31 +321,33 @@ def _linked_text_as_description(
self, converter_data: data_session.ConverterData
) -> data_models.CapellaWorkItem:
"""Return attributes for a ``Constraint``."""
work_item = self._generic_work_item(converter_data)
# pylint: disable-next=attribute-defined-outside-init
work_item.description = self._get_linked_text(converter_data)
return work_item
assert converter_data.work_item, "No work item set yet"
converter_data.work_item.description = self._get_linked_text(
converter_data
)
return converter_data.work_item

def _add_context_diagram(
self, converter_data: data_session.ConverterData
) -> data_models.CapellaWorkItem:
"""Add a new custom field context diagram."""
work_item = self._generic_work_item(converter_data)
assert converter_data.work_item, "No work item set yet"
diagram = converter_data.capella_element.context_diagram
work_item.additional_attributes["context_diagram"] = {
converter_data.work_item.additional_attributes["context_diagram"] = {
"type": "text/html",
"value": _generate_image_html(diagram.as_datauri_svg),
}
return work_item
return converter_data.work_item

def _add_tree_diagram(
self, converter_data: data_session.ConverterData
) -> data_models.CapellaWorkItem:
"""Add a new custom field tree diagram."""
work_item = self._generic_work_item(converter_data)
assert converter_data.work_item, "No work item set yet"
diagram = converter_data.capella_element.tree_view
work_item.additional_attributes["tree_view"] = {
converter_data.work_item.additional_attributes["tree_view"] = {
"type": "text/html",
"value": _generate_image_html(diagram.as_datauri_svg),
}
return work_item
return converter_data.work_item
22 changes: 0 additions & 22 deletions capella2polarion/data_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,3 @@ class Condition(t.TypedDict):
uuid_capella: str
preCondition: Condition | None
postCondition: Condition | None

def __add__(self, other: CapellaWorkItem) -> CapellaWorkItem:
"""Add a CapellaWorkItem to this one."""
if not isinstance(other, CapellaWorkItem):
raise TypeError("Can only merge WorkItems")

merged_data: dict[str, t.Any] = {}
self_dict = self.to_dict()
other_dict = other.to_dict()
for key in set(self_dict) | set(other_dict):
self_val: t.Any = self_dict.get(key)
other_val: t.Any = other_dict.get(key)

if isinstance(self_val, list) and isinstance(other_val, list):
merged_data[key] = self_val + other_val
elif isinstance(self_val, dict) and isinstance(other_val, dict):
merged_data[key] = {**self_val, **other_val}
else:
merged_data[key] = (
other_val if other_val is not None else self_val
)
return CapellaWorkItem(**merged_data)

0 comments on commit 62d57e7

Please sign in to comment.