From 398a536e89aa5228bc5f966793d89b44b69c31c8 Mon Sep 17 00:00:00 2001 From: Michael Harbarth Date: Tue, 3 Sep 2024 09:41:32 +0200 Subject: [PATCH] refactor: refactor text work item generation --- .../connectors/polarion_worker.py | 20 +-- .../converters/document_config.py | 17 +-- .../converters/document_renderer.py | 142 ++++-------------- .../converters/polarion_html_helper.py | 17 ++- .../converters/text_work_item_provider.py | 122 +++++++++++++++ capella2polarion/data_models.py | 21 ++- tests/test_documents.py | 1 - 7 files changed, 188 insertions(+), 152 deletions(-) create mode 100644 capella2polarion/converters/text_work_item_provider.py diff --git a/capella2polarion/connectors/polarion_worker.py b/capella2polarion/connectors/polarion_worker.py index 763f2db5..a1a348eb 100644 --- a/capella2polarion/connectors/polarion_worker.py +++ b/capella2polarion/connectors/polarion_worker.py @@ -15,11 +15,7 @@ from capella2polarion import data_models from capella2polarion.connectors import polarion_repo -from capella2polarion.converters import ( - data_session, - document_config, - polarion_html_helper, -) +from capella2polarion.converters import data_session, polarion_html_helper logger = logging.getLogger(__name__) @@ -496,17 +492,13 @@ def _process_document_datas(self, client, document_datas): for document_data in document_datas: headings += document_data.headings documents.append(document_data.document) - if document_data.text_work_items: - text_work_item_type = next( - iter(document_data.text_work_items.values()) - ).type + if document_data.text_work_item_provider.new_text_work_items: self._create_and_update_text_work_items( - document_data.text_work_items, client + document_data.text_work_item_provider.new_text_work_items, + client, ) - polarion_html_helper.insert_text_work_items( + document_data.text_work_item_provider.insert_text_work_items( document_data.document, - document_data.text_work_items, - text_work_item_type, ) return documents, headings @@ -526,7 +518,7 @@ def get_document( def load_polarion_documents( self, - document_infos: t.Iterable[document_config.DocumentInfo], + document_infos: t.Iterable[data_models.DocumentInfo], ) -> dict[ tuple[str | None, str, str], tuple[polarion_api.Document | None, list[polarion_api.WorkItem]], diff --git a/capella2polarion/converters/document_config.py b/capella2polarion/converters/document_config.py index be1df534..6a936666 100644 --- a/capella2polarion/converters/document_config.py +++ b/capella2polarion/converters/document_config.py @@ -1,7 +1,6 @@ # Copyright DB InfraGO AG and contributors # SPDX-License-Identifier: Apache-2.0 """Module with classes and a loader for document rendering configs.""" -import dataclasses import logging import pathlib import typing as t @@ -12,22 +11,12 @@ import pydantic import yaml +from capella2polarion import data_models from capella2polarion.converters import polarion_html_helper logger = logging.getLogger(__name__) -@dataclasses.dataclass -class DocumentInfo: - """Class for information regarding a document which should be created.""" - - project_id: str | None - module_folder: str - module_name: str - text_work_item_type: str - text_work_item_id_field: str - - class WorkItemLayout(pydantic.BaseModel): """Configuration for rendering layouts of work items.""" @@ -93,11 +82,11 @@ class DocumentConfigs(pydantic.BaseModel): pydantic.Field(default_factory=list) ) - def iterate_documents(self) -> t.Iterator[DocumentInfo]: + def iterate_documents(self) -> t.Iterator[data_models.DocumentInfo]: """Yield all document paths of the config as tuples.""" for conf in self.full_authority + self.mixed_authority: for inst in conf.instances: - yield DocumentInfo( + yield data_models.DocumentInfo( project_id=conf.project_id, module_folder=inst.polarion_space, module_name=inst.polarion_name, diff --git a/capella2polarion/converters/document_renderer.py b/capella2polarion/converters/document_renderer.py index 6b2ccb25..a0c1766a 100644 --- a/capella2polarion/converters/document_renderer.py +++ b/capella2polarion/converters/document_renderer.py @@ -18,6 +18,7 @@ from .. import data_models from . import document_config, polarion_html_helper +from . import text_work_item_provider as twi logger = logging.getLogger(__name__) @@ -169,10 +170,9 @@ def render_document( heading_numbering: bool = False, rendering_layouts: list[polarion_api.RenderingLayout] | None = None, *, - text_work_item_identifier: str = polarion_html_helper.TEXT_WORK_ITEM_ID_FIELD, - text_work_item_type: str = polarion_html_helper.TEXT_WORK_ITEM_TYPE, + text_work_item_provider: twi.TextWorkItemProvider | None = None, **kwargs: t.Any, - ): + ) -> data_models.DocumentData: """Render a new Polarion document.""" @t.overload @@ -182,11 +182,9 @@ def render_document( template_name: str, *, document: polarion_api.Document, - text_work_items: dict[str, polarion_api.WorkItem], - text_work_item_identifier: str = polarion_html_helper.TEXT_WORK_ITEM_ID_FIELD, - text_work_item_type: str = polarion_html_helper.TEXT_WORK_ITEM_TYPE, + text_work_item_provider: twi.TextWorkItemProvider | None = None, **kwargs: t.Any, - ): + ) -> data_models.DocumentData: """Update an existing Polarion document.""" def render_document( @@ -199,13 +197,13 @@ def render_document( heading_numbering: bool = False, rendering_layouts: list[polarion_api.RenderingLayout] | None = None, document: polarion_api.Document | None = None, - text_work_items: dict[str, polarion_api.WorkItem] | None = None, - text_work_item_identifier: str = polarion_html_helper.TEXT_WORK_ITEM_ID_FIELD, - text_work_item_type: str = polarion_html_helper.TEXT_WORK_ITEM_TYPE, + text_work_item_provider: twi.TextWorkItemProvider | None = None, **kwargs: t.Any, - ): + ) -> data_models.DocumentData: """Render a Polarion document.""" - text_work_items = text_work_items or {} + text_work_item_provider = ( + text_work_item_provider or twi.TextWorkItemProvider() + ) if document is not None: polarion_folder = document.module_folder polarion_name = document.module_name @@ -238,11 +236,8 @@ def render_document( rendering_result = template.render( model=self.model, session=session, **kwargs ) - new_text_work_items = self._extract_text_work_items( + text_work_item_provider.generate_text_work_items( lxmlhtml.fragments_fromstring(rendering_result), - text_work_items, - text_work_item_type, - text_work_item_identifier, ) document.home_page_content = polarion_api.TextContent( @@ -252,7 +247,7 @@ def render_document( document.rendering_layouts = session.rendering_layouts return data_models.DocumentData( - document, session.headings, new_text_work_items + document, session.headings, text_work_item_provider ) def update_mixed_authority_document( @@ -262,11 +257,12 @@ def update_mixed_authority_document( sections: dict[str, str], global_parameters: dict[str, t.Any], section_parameters: dict[str, dict[str, t.Any]], - text_work_items: dict[str, polarion_api.WorkItem], - text_work_item_identifier: str = polarion_html_helper.TEXT_WORK_ITEM_ID_FIELD, - text_work_item_type: str = polarion_html_helper.TEXT_WORK_ITEM_TYPE, - ): + text_work_item_provider: twi.TextWorkItemProvider | None = None, + ) -> data_models.DocumentData: """Update a mixed authority document.""" + text_work_item_provider = ( + text_work_item_provider or twi.TextWorkItemProvider() + ) assert ( document.home_page_content and document.home_page_content.value ), "In mixed authority the document must have content" @@ -281,7 +277,6 @@ def update_mixed_authority_document( env = self._get_jinja_env(template_folder) new_content = [] - new_text_work_items = {} last_section_end = 0 for section_name, area in section_areas.items(): @@ -310,19 +305,9 @@ def update_mixed_authority_document( work_item_ids = polarion_html_helper.extract_work_items( current_content ) - section_text_work_items = { - text_id: work_item - for text_id, work_item in text_work_items.items() - if work_item.id in work_item_ids - } html_fragments = lxmlhtml.fragments_fromstring(content) - new_text_work_items.update( - self._extract_text_work_items( - html_fragments, - section_text_work_items, - text_work_item_type, - text_work_item_identifier, - ) + text_work_item_provider.generate_text_work_items( + html_fragments, work_item_ids ) new_content += html_fragments @@ -341,7 +326,7 @@ def update_mixed_authority_document( document.rendering_layouts = session.rendering_layouts return data_models.DocumentData( - document, session.headings, new_text_work_items + document, session.headings, text_work_item_provider ) def _get_and_customize_doc( @@ -387,25 +372,6 @@ def render_documents( return self.projects - def _make_text_work_item_mapping( - self, - work_items: list[polarion_api.WorkItem], - text_work_item_field_id: str, - ) -> dict[str, polarion_api.WorkItem]: - result = {} - for work_item in work_items: - # We only use those work items which have an ID defined by us - if text_id := work_item.additional_attributes.get( - text_work_item_field_id - ): - if text_id in result: - raise ValueError( - f"There are multiple text work items with {text_work_item_field_id} == {text_id}" - ) - - result[text_id] = work_item - return result - def _check_document_status( self, document: polarion_api.Document, @@ -448,6 +414,11 @@ def _render_mixed_authority_documents( rendering_layouts, config.heading_numbering, ) + text_work_item_provider = twi.TextWorkItemProvider( + config.text_work_item_id_field, + config.text_work_item_type, + text_work_items, + ) if old_doc is None: logger.error( "For document %s/%s no document was found, but it's " @@ -467,11 +438,7 @@ def _render_mixed_authority_documents( config.sections, instance.params, instance.section_params, - self._make_text_work_item_mapping( - text_work_items, config.text_work_item_id_field - ), - config.text_work_item_id_field, - config.text_work_item_type, + text_work_item_provider, ) except Exception as e: logger.error( @@ -505,6 +472,11 @@ def _render_full_authority_documents( rendering_layouts, config.heading_numbering, ) + text_work_item_provider = twi.TextWorkItemProvider( + config.text_work_item_id_field, + config.text_work_item_type, + text_work_items, + ) if old_doc: if not self._check_document_status(old_doc, config): continue @@ -514,11 +486,7 @@ def _render_full_authority_documents( config.template_directory, config.template, document=old_doc, - text_work_items=self._make_text_work_item_mapping( - text_work_items, config.text_work_item_id_field - ), - text_work_item_identifier=config.text_work_item_id_field, - text_work_item_type=config.text_work_item_type, + text_work_item_provider=text_work_item_provider, **instance.params, ) except Exception as e: @@ -542,8 +510,7 @@ def _render_full_authority_documents( instance.polarion_title, config.heading_numbering, rendering_layouts, - text_work_item_identifier=config.text_work_item_id_field, - text_work_item_type=config.text_work_item_type, + text_work_item_provider=text_work_item_provider, **instance.params, ) except Exception as e: @@ -603,46 +570,3 @@ def _extract_section_areas(self, html_elements: list[etree._Element]): current_area_id = None current_area_start = None return section_areas - - def _extract_text_work_items( - self, - content: list[lxmlhtml.HtmlElement], - text_work_items: dict[str, polarion_api.WorkItem], - text_work_item_type: str, - field_id: str, - ) -> dict[str, polarion_api.WorkItem]: - work_items: dict[str, polarion_api.WorkItem] = {} - for element in content: - if element.tag != polarion_html_helper.WORK_ITEM_TAG: - continue - - if not (text_id := element.get("id")): - raise ValueError("All work items must have an ID in template") - - work_item = text_work_items.pop( - text_id, - polarion_api.WorkItem( - type=text_work_item_type, - title="", - status="open", - additional_attributes={field_id: text_id}, - ), - ) - work_item.description_type = "text/html" - inner_content = "".join( - [ - ( - lxmlhtml.tostring(child, encoding="unicode") - if isinstance(child, lxmlhtml.HtmlElement) - else child - ) - for child in element.iterchildren() - ] - ) - if element.text: - inner_content = element.text + inner_content - - work_item.description = inner_content - work_items[text_id] = work_item - - return work_items diff --git a/capella2polarion/converters/polarion_html_helper.py b/capella2polarion/converters/polarion_html_helper.py index f24a7913..9c69cb18 100644 --- a/capella2polarion/converters/polarion_html_helper.py +++ b/capella2polarion/converters/polarion_html_helper.py @@ -123,7 +123,7 @@ def remove_table_ids( time the REST-API does not allow posting or patching a document with multiple tables having the same ID. """ - html_fragments = _ensure_fragments(html_content) + html_fragments = ensure_fragments(html_content) for element in html_fragments: if element.tag == "table": @@ -132,26 +132,27 @@ def remove_table_ids( return html_fragments -def _ensure_fragments( - html_content: str | list[html.HtmlComment], -) -> list[html.HtmlComment]: +def ensure_fragments( + html_content: str | list[html.HtmlElement], +) -> list[html.HtmlElement]: + """Convert string to html elements.""" if isinstance(html_content, str): return html.fragments_fromstring(html_content) return html_content -def extract_headings(html_content: str | list[html.HtmlComment]) -> list[str]: +def extract_headings(html_content: str | list[html.HtmlElement]) -> list[str]: """Return a list of work item IDs for all headings in the given content.""" return extract_work_items(html_content, h_regex) def extract_work_items( - html_content: str | list[html.HtmlComment], + html_content: str | list[html.HtmlElement], tag_regex: re.Pattern | None = None, ) -> list[str]: """Return a list of work item IDs for work items in the given content.""" work_items = [] - html_fragments = _ensure_fragments(html_content) + html_fragments = ensure_fragments(html_content) for element in html_fragments: if isinstance(element, html.HtmlComment): continue @@ -177,7 +178,7 @@ def insert_text_work_items( layout_index = get_layout_index( "paragraph", document.rendering_layouts, text_work_item_type ) - html_fragments = _ensure_fragments(document.home_page_content.value) + html_fragments = ensure_fragments(document.home_page_content.value) new_content = [] last_match = -1 for index, element in enumerate(html_fragments): diff --git a/capella2polarion/converters/text_work_item_provider.py b/capella2polarion/converters/text_work_item_provider.py new file mode 100644 index 00000000..cb74ae4f --- /dev/null +++ b/capella2polarion/converters/text_work_item_provider.py @@ -0,0 +1,122 @@ +# Copyright DB InfraGO AG and contributors +# SPDX-License-Identifier: Apache-2.0 +"""Provides a class to generate and inset text work items in documents.""" +import polarion_rest_api_client as polarion_api +from lxml import html + +from capella2polarion.converters import polarion_html_helper + + +class TextWorkItemProvider: + """Class providing text work items, their generation and insertion.""" + + def __init__( + self, + text_work_item_id_field: str = polarion_html_helper.TEXT_WORK_ITEM_ID_FIELD, + text_work_item_type: str = polarion_html_helper.TEXT_WORK_ITEM_TYPE, + existing_text_work_items: list[polarion_api.WorkItem] | None = None, + ): + self.old_text_work_items: dict[str, polarion_api.WorkItem] = {} + for work_item in existing_text_work_items or []: + # We only use those work items which have an ID defined by us + if text_id := work_item.additional_attributes.get( + text_work_item_id_field + ): + if text_id in self.old_text_work_items: + raise ValueError( + f"There are multiple text work items with " + f"{text_work_item_id_field} == {text_id}" + ) + + self.old_text_work_items[text_id] = work_item + + self.text_work_item_id_field = text_work_item_id_field + self.text_work_item_type = text_work_item_type + self.new_text_work_items: dict[str, polarion_api.WorkItem] = {} + + def generate_text_work_items( + self, + content: list[html.HtmlElement] | str, + work_item_id_filter: list[str] | None = None, + ): + """Generate text work items from the provided html.""" + content = polarion_html_helper.ensure_fragments(content) + for element in content: + if element.tag != polarion_html_helper.WORK_ITEM_TAG: + continue + + if not (text_id := element.get("id")): + raise ValueError("All work items must have an ID in template") + + if ( + work_item_id_filter is None or text_id in work_item_id_filter + ) and text_id in self.old_text_work_items: + work_item = self.old_text_work_items[text_id] + else: + work_item = polarion_api.WorkItem( + type=self.text_work_item_type, + title="", + status="open", + additional_attributes={ + self.text_work_item_id_field: text_id + }, + ) + + work_item.description_type = "text/html" + inner_content = "".join( + [ + ( + html.tostring(child, encoding="unicode") + if isinstance(child, html.HtmlElement) + else child + ) + for child in element.iterchildren() + ] + ) + if element.text: + inner_content = element.text + inner_content + + work_item.description = inner_content + self.new_text_work_items[text_id] = work_item + + def insert_text_work_items( + self, + document: polarion_api.Document, + ): + """Insert text work items into the given document.""" + if not self.new_text_work_items: + return + + assert document.home_page_content is not None + layout_index = polarion_html_helper.get_layout_index( + "paragraph", document.rendering_layouts, self.text_work_item_type + ) + html_fragments = polarion_html_helper.ensure_fragments( + document.home_page_content.value + ) + new_content = [] + last_match = -1 + for index, element in enumerate(html_fragments): + if isinstance(element, html.HtmlComment): + continue + + if element.tag == "workitem": + new_content += html_fragments[last_match + 1 : index] + last_match = index + if work_item := self.new_text_work_items.get( + element.get("id") + ): + new_content.append( + html.fromstring( + polarion_html_helper.POLARION_WORK_ITEM_DOCUMENT.format( + pid=work_item.id, + lid=layout_index, + custom_info="", + ) + ) + ) + + new_content += html_fragments[last_match + 1 :] + document.home_page_content.value = "\n".join( + [html.tostring(element).decode("utf-8") for element in new_content] + ) diff --git a/capella2polarion/data_models.py b/capella2polarion/data_models.py index 9ef8153a..b6dcd3c1 100644 --- a/capella2polarion/data_models.py +++ b/capella2polarion/data_models.py @@ -11,6 +11,8 @@ import polarion_rest_api_client as polarion_api +from capella2polarion.converters import text_work_item_provider + class CapellaWorkItem(polarion_api.WorkItem): """A WorkItem class with additional Capella related attributes.""" @@ -68,9 +70,16 @@ class DocumentData: """A class to store data related to a rendered document.""" document: polarion_api.Document - headings: list[polarion_api.WorkItem] = dataclasses.field( - default_factory=list - ) - text_work_items: dict[str, polarion_api.WorkItem] = dataclasses.field( - default_factory=dict - ) + headings: list[polarion_api.WorkItem] + text_work_item_provider: text_work_item_provider.TextWorkItemProvider + + +@dataclasses.dataclass +class DocumentInfo: + """Class for information regarding a document which should be created.""" + + project_id: str | None + module_folder: str + module_name: str + text_work_item_type: str + text_work_item_id_field: str diff --git a/tests/test_documents.py b/tests/test_documents.py index 47f33569..fb55d68f 100644 --- a/tests/test_documents.py +++ b/tests/test_documents.py @@ -231,7 +231,6 @@ def test_mixed_authority_document( "global_param": "Overwrite global param", }, }, - {}, ) content: list[etree._Element] = html.fromstring(