From 20669bde8a20be838904b9368a9c688f8876cf49 Mon Sep 17 00:00:00 2001 From: Leon Date: Tue, 5 Sep 2023 09:42:04 +0200 Subject: [PATCH] Fix/nsmap copy bug (#245) ## Types of changes - [x] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Checklist: - [x] I have updated the [changelog](../CHANGELOG.md) accordingly. - [x] I have added tests to cover my changes. --- CHANGELOG.md | 1 + examples/requestmanipulator.py | 4 +- pyproject.toml | 2 +- src/sdc11073/consumer/consumerimpl.py | 11 +- src/sdc11073/consumer/manipulator.py | 4 +- src/sdc11073/consumer/subscription.py | 5 +- src/sdc11073/mdib/containerbase.py | 15 +- src/sdc11073/mdib/descriptorcontainers.py | 9 +- src/sdc11073/mdib/mdibbase.py | 13 +- src/sdc11073/mdib/statecontainers.py | 11 +- src/sdc11073/provider/dpwshostedservice.py | 3 +- .../porttypes/descriptioneventserviceimpl.py | 4 +- .../provider/porttypes/porttypebase.py | 13 +- src/sdc11073/provider/subscriptionmgr.py | 3 +- .../provider/subscriptionmgr_async.py | 7 +- src/sdc11073/provider/subscriptionmgr_base.py | 8 +- src/sdc11073/pysoap/msgfactory.py | 3 +- src/sdc11073/pysoap/msgreader.py | 23 +- src/sdc11073/pysoap/soapenvelope.py | 17 +- src/sdc11073/xml_types/addressing_types.py | 9 +- src/sdc11073/xml_types/basetypes.py | 9 +- src/sdc11073/xml_types/pm_types.py | 17 +- src/sdc11073/xml_types/xml_structure.py | 104 +++---- src/sdc11073/xml_utils.py | 57 ++++ tests/test_xml_utils.py | 276 ++++++++++++++++++ 25 files changed, 488 insertions(+), 140 deletions(-) create mode 100644 src/sdc11073/xml_utils.py create mode 100644 tests/test_xml_utils.py diff --git a/CHANGELOG.md b/CHANGELOG.md index a32ae457..217adbed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - fixed a bug where the `SdcConsumer` failed to determine the host network adapter if the ip contained in the `device_location` is on a different subnet - comparison of extensions would fail [#238](https://github.com/Draegerwerk/sdc11073/issues/238) - ExtensionLocalValue.value must be a list instead of a dictionary in order to allow multiple elements with same name. +- fixed a bug where namespaces of xml are lost when coping lxml elements ### Changed diff --git a/examples/requestmanipulator.py b/examples/requestmanipulator.py index 61e447c8..31b71a83 100644 --- a/examples/requestmanipulator.py +++ b/examples/requestmanipulator.py @@ -2,7 +2,7 @@ from typing import TYPE_CHECKING, Union if TYPE_CHECKING: - from lxml.etree import ElementBase + from sdc11073 import xml_utils from sdc11073.pysoap.soapenvelope import Soap12Envelope @@ -26,7 +26,7 @@ def manipulate_soapenvelope(self, soap_envelope: Soap12Envelope) -> Union[Soap12 return self.cb_soapenvelope(soap_envelope) return None - def manipulate_domtree(self, domtree: ElementBase) -> Union[ElementBase, None]: + def manipulate_domtree(self, domtree: xml_utils.LxmlElement) -> Union[xml_utils.LxmlElement, None]: if callable(self.cb_xml): return self.cb_xml(domtree) return None diff --git a/pyproject.toml b/pyproject.toml index a0a6303e..1d9b5909 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -172,7 +172,7 @@ max-doc-length = 120 # https://beta.ruff.rs/docs/settings/#max-doc-length "__init__.py" = ["D104"] [tool.mypy] -python_version = 3.9 # https://mypy.readthedocs.io/en/stable/config_file.html#confval-python_version +python_version = "3.9" # https://mypy.readthedocs.io/en/stable/config_file.html#confval-python_version strict = true # https://mypy.readthedocs.io/en/stable/config_file.html#confval-strict disallow_untyped_calls = true # https://mypy.readthedocs.io/en/stable/config_file.html#confval-disallow_untyped_calls disallow_untyped_defs = true # https://mypy.readthedocs.io/en/stable/config_file.html#confval-disallow_untyped_defs diff --git a/src/sdc11073/consumer/consumerimpl.py b/src/sdc11073/consumer/consumerimpl.py index fad12fb2..caea12b6 100644 --- a/src/sdc11073/consumer/consumerimpl.py +++ b/src/sdc11073/consumer/consumerimpl.py @@ -3,7 +3,6 @@ import copy import ssl -import time import traceback import uuid from dataclasses import dataclass @@ -13,7 +12,7 @@ from lxml import etree as etree_ import sdc11073.certloader -from sdc11073 import commlog, loghelper, network +from sdc11073 import commlog, loghelper, network, xml_utils from sdc11073 import observableproperties as properties from sdc11073.definitions_base import ProtocolsRegistry from sdc11073.dispatch import DispatchKey, MessageConverterMiddleware @@ -30,7 +29,6 @@ from .request_handler_deferred import EmptyResponse if TYPE_CHECKING: - from ssl import SSLContext from collections.abc import Iterable from sdc11073.dispatch.request import RequestData from sdc11073.xml_types.mex_types import HostedServiceType @@ -41,7 +39,6 @@ from sdc11073.mdib.consumermdib import ConsumerMdib from sdc11073.consumer.serviceclients.serviceclientbase import HostedServiceClient from sdc11073.consumer.subscription import ConsumerSubscriptionManagerProtocol - from sdc11073.loghelper import LoggerAdapter from sdc11073.wsdiscovery.service import Service from .components import SdcConsumerComponents @@ -65,7 +62,7 @@ def __init__(self, service_id: str, # noqa: PLR0913 self.log_prefix = log_prefix self.meta_data: mex_types.Metadata | None = None self.wsdl_string = None - self.wsdl_node: etree_.ElementBase | None = None + self.wsdl_node: xml_utils.LxmlElement | None = None self._logger = loghelper.get_logger_adapter('sdc.client.hosted', log_prefix) self._url = urlparse(endpoint_address) self.services = {} @@ -341,7 +338,7 @@ def do_subscribe(self, dpws_hosted: HostedServiceType, # noqa: PLR0913 filter_type: eventing_types.FilterType, actions: Iterable[DispatchKey], expire_minutes: int = 60, - any_elements: list | None = None, + any_elements: list[xml_utils.LxmlElement] | None = None, any_attributes: dict | None = None) -> ConsumerSubscription: """Send subscribe request to provider. @@ -350,7 +347,7 @@ def do_subscribe(self, dpws_hosted: HostedServiceType, # noqa: PLR0913 :param filter_type: the filter that is sent to device :param actions: a list of DispatchKeys that this subscription shall handle :param expire_minutes: defaults to 1 hour - :param any_elements: optional list of etree.ElementBase objects + :param any_elements: optional list of lxml elements :param any_attributes: optional dictionary of name:str - value:str pairs :return: a subscription object that has callback already registered. """ diff --git a/src/sdc11073/consumer/manipulator.py b/src/sdc11073/consumer/manipulator.py index 7363ec8d..2c5eb65f 100644 --- a/src/sdc11073/consumer/manipulator.py +++ b/src/sdc11073/consumer/manipulator.py @@ -3,7 +3,7 @@ from typing import TYPE_CHECKING, Protocol if TYPE_CHECKING: - from lxml.etree import ElementBase + from sdc11073 import xml_utils from sdc11073.pysoap.soapenvelope import Soap12Envelope @@ -20,7 +20,7 @@ class RequestManipulatorProtocol(Protocol): def manipulate_soapenvelope(self, soap_envelope: Soap12Envelope) -> Soap12Envelope | None: """Manipulate on Soap12Envelope level.""" - def manipulate_domtree(self, domtree: ElementBase) -> ElementBase | None: + def manipulate_domtree(self, domtree: xml_utils.LxmlElement) -> xml_utils.LxmlElement | None: """Manipulate on etree.Element level.""" def manipulate_string(self, xml_string: str) -> str | None: diff --git a/src/sdc11073/consumer/subscription.py b/src/sdc11073/consumer/subscription.py index a96f8561..23b72f96 100644 --- a/src/sdc11073/consumer/subscription.py +++ b/src/sdc11073/consumer/subscription.py @@ -28,6 +28,7 @@ from sdc11073.pysoap.soapclient import SoapClientProtocol from sdc11073.xml_types.eventing_types import FilterType from sdc11073.xml_types.mex_types import HostedServiceType + from sdc11073 import xml_utils class ConsumerSubscriptionProtocol(Protocol): @@ -90,9 +91,9 @@ def __init__(self, msg_factory: MessageFactory, # noqa: PLR0913 self.requested_expires: int | float = 0 self.granted_expires: int | float = 0 - self.notify_to_identifier: etree_.ElementBase | None = None + self.notify_to_identifier: xml_utils.LxmlElement | None = None - self.end_to_identifier: etree_.ElementBase | None = None + self.end_to_identifier: xml_utils.LxmlElement | None = None self._logger = loghelper.get_logger_adapter('sdc.client.subscr', log_prefix) self.event_counter = 0 # for display purpose, we count notifications diff --git a/src/sdc11073/mdib/containerbase.py b/src/sdc11073/mdib/containerbase.py index 4a6e9576..e31e2cf7 100644 --- a/src/sdc11073/mdib/containerbase.py +++ b/src/sdc11073/mdib/containerbase.py @@ -4,10 +4,11 @@ import inspect from typing import Any -from lxml.etree import Element, ElementBase, QName +from lxml.etree import Element, QName from sdc11073 import observableproperties as properties from sdc11073.namespaces import QN_TYPE, NamespaceHelper +from sdc11073 import xml_utils class ContainerBase: @@ -33,7 +34,7 @@ def get_actual_value(self, attr_name: str) -> Any: """Ignore default value and implied value, e.g. return None if value is not present in xml.""" return getattr(self.__class__, attr_name).get_actual_value(self) - def mk_node(self, tag: QName, ns_helper: NamespaceHelper, set_xsi_type: bool = False) -> ElementBase: + def mk_node(self, tag: QName, ns_helper: NamespaceHelper, set_xsi_type: bool = False) -> xml_utils.LxmlElement: """Create an etree node from instance data. :param tag: tag of the newly created node @@ -48,9 +49,9 @@ def mk_node(self, tag: QName, ns_helper: NamespaceHelper, set_xsi_type: bool = F self.update_node(node, ns_helper, set_xsi_type) return node - def update_node(self, node: ElementBase, + def update_node(self, node: xml_utils.LxmlElement, ns_helper: NamespaceHelper, - set_xsi_type: bool = False) -> ElementBase: + set_xsi_type: bool = False) -> xml_utils.LxmlElement: """Update node with own data. :param node: node to be updated @@ -64,7 +65,7 @@ def update_node(self, node: ElementBase, prop.update_xml_value(self, node) return node - def update_from_node(self, node: ElementBase): + def update_from_node(self, node: xml_utils.LxmlElement): """Update members from node.""" for _, cprop in self.sorted_container_properties(): cprop.update_from_node(self, node) @@ -82,8 +83,8 @@ def _update_from_other(self, other_container: ContainerBase, skipped_properties: def mk_copy(self, copy_node: bool = True) -> ContainerBase: """Make a copy of self.""" copied = copy.copy(self) - if copy_node: - copied.node = copy.deepcopy(self.node) + if copy_node and self.node is not None: + copied.node = xml_utils.copy_element(self.node) return copied def sorted_container_properties(self) -> list: diff --git a/src/sdc11073/mdib/descriptorcontainers.py b/src/sdc11073/mdib/descriptorcontainers.py index de681258..4f4398f9 100644 --- a/src/sdc11073/mdib/descriptorcontainers.py +++ b/src/sdc11073/mdib/descriptorcontainers.py @@ -18,6 +18,7 @@ from decimal import Decimal from lxml import etree as etree_ + from sdc11073 import xml_utils from sdc11073.namespaces import NamespaceHelper from sdc11073.xml_types.isoduration import DurationType @@ -86,7 +87,7 @@ def set_source_mds(self, handle: str): ... @classmethod - def from_node(cls, node: etree_.ElementBase, parent_handle: str | None = None) -> AbstractDescriptorProtocol: + def from_node(cls, node: xml_utils.LxmlElement, parent_handle: str | None = None) -> AbstractDescriptorProtocol: """Create class and init its properties from the node.""" @@ -201,7 +202,7 @@ def diff(self, other: AbstractDescriptorContainer, ignore_property_names: list[s return None if len(ret) == 0 else ret def mk_descriptor_node(self, tag: etree_.QName, - ns_helper: NamespaceHelper, set_xsi_type: bool = True) -> etree_.ElementBase: + ns_helper: NamespaceHelper, set_xsi_type: bool = True) -> xml_utils.LxmlElement: """Create a lxml etree node from instance data. :param tag: tag of node @@ -226,7 +227,7 @@ def tag_name_for_child_descriptor(self, node_type: etree_.QName) -> (etree_.QNam return child.child_qname, set_xsi_type raise ValueError(f'{node_type} not known in child declarations of {self.__class__.__name__}') - def sort_child_nodes(self, node: etree_.ElementBase) -> None: + def sort_child_nodes(self, node: xml_utils.LxmlElement) -> None: """Bring all child elements of node in correct order (BICEPS schema). raises a ValueError if a child node exist that is not listed in ordered_tags @@ -265,7 +266,7 @@ def __repr__(self) -> str: f'parent={self.parent_handle}') @classmethod - def from_node(cls, node: etree_.ElementBase, parent_handle: str | None = None) -> AbstractDescriptorContainer: + def from_node(cls, node: xml_utils.LxmlElement, parent_handle: str | None = None) -> AbstractDescriptorContainer: """Create class and init its properties from the node.""" obj = cls(handle=None, # will be determined in constructor from node value parent_handle=parent_handle) diff --git a/src/sdc11073/mdib/mdibbase.py b/src/sdc11073/mdib/mdibbase.py index bb3c6965..f16538cf 100644 --- a/src/sdc11073/mdib/mdibbase.py +++ b/src/sdc11073/mdib/mdibbase.py @@ -17,6 +17,7 @@ from sdc11073.definitions_base import BaseDefinitions from sdc11073.loghelper import LoggerAdapter from sdc11073.xml_types.pm_types import CodedValue + from sdc11073 import xml_utils from .descriptorcontainers import AbstractDescriptorContainer, AbstractOperationDescriptorContainer from .statecontainers import AbstractMultiStateContainer, AbstractStateContainer @@ -348,7 +349,7 @@ def add_state_containers(self, state_containers: list[AbstractStateContainer | A self._logger.error('add_state_containers: {}, DescriptorHandle={}; {}', # noqa: PLE1205 ex, state_container.DescriptorHandle, traceback.format_exc()) - def _reconstruct_md_description(self) -> etree_.ElementBase: + def _reconstruct_md_description(self) -> xml_utils.LxmlElement: """Build dom tree of descriptors from current data.""" pm = self.data_model.pm_names doc_nsmap = self.nsmapper.ns_map @@ -364,7 +365,7 @@ def _reconstruct_md_description(self) -> etree_.ElementBase: def make_descriptor_node(self, descriptor_container: AbstractDescriptorContainer, tag: etree_.QName, - set_xsi_type: bool = True) -> etree_.ElementBase: + set_xsi_type: bool = True) -> xml_utils.LxmlElement: """Create a lxml etree node with subtree from instance data. :param descriptor_container: a descriptor container instance @@ -387,7 +388,7 @@ def make_descriptor_node(self, descriptor_container.sort_child_nodes(node) return node - def _reconstruct_mdib(self, add_context_states: bool) -> etree_.ElementBase: + def _reconstruct_mdib(self, add_context_states: bool) -> xml_utils.LxmlElement: """Build dom tree of mdib from current data. If add_context_states is False, context states are not included. @@ -413,13 +414,13 @@ def _reconstruct_mdib(self, add_context_states: bool) -> etree_.ElementBase: md_state_node.append(state_container.mk_state_node(tag, self.nsmapper)) return mdib_node - def reconstruct_md_description(self) -> (etree_.ElementBase, MdibVersionGroup): + def reconstruct_md_description(self) -> (xml_utils.LxmlElement, MdibVersionGroup): """Build dom tree of descriptors from current data.""" with self.mdib_lock: node = self._reconstruct_md_description() return node, self.mdib_version_group - def reconstruct_mdib(self) -> (etree_.ElementBase, MdibVersionGroup): + def reconstruct_mdib(self) -> (xml_utils.LxmlElement, MdibVersionGroup): """Build dom tree from current data. This method does not include context states! @@ -427,7 +428,7 @@ def reconstruct_mdib(self) -> (etree_.ElementBase, MdibVersionGroup): with self.mdib_lock: return self._reconstruct_mdib(add_context_states=False), self.mdib_version_group - def reconstruct_mdib_with_context_states(self) -> (etree_.ElementBase, MdibVersionGroup): + def reconstruct_mdib_with_context_states(self) -> (xml_utils.LxmlElement, MdibVersionGroup): """Build dom tree from current data. This method includes the context states. diff --git a/src/sdc11073/mdib/statecontainers.py b/src/sdc11073/mdib/statecontainers.py index dad8d5b2..21ea214b 100644 --- a/src/sdc11073/mdib/statecontainers.py +++ b/src/sdc11073/mdib/statecontainers.py @@ -16,12 +16,13 @@ if TYPE_CHECKING: from decimal import Decimal - from lxml.etree import ElementBase, QName + from lxml.etree import QName from sdc11073.location import SdcLocation from sdc11073.namespaces import NamespaceHelper from sdc11073.xml_types.isoduration import DurationType from sdc11073.xml_types.xml_structure import ExtensionLocalValue + from sdc11073 import xml_utils from .descriptorcontainers import AbstractDescriptorProtocol @@ -52,7 +53,7 @@ def update_from_other_container(self, other: AbstractStateProtocol, skipped_properties: list[str] | None = None): """Copy all properties except the skipped ones to self.""" - def update_from_node(self, node: ElementBase): + def update_from_node(self, node: xml_utils.LxmlElement): """Update members from node.""" @@ -88,7 +89,7 @@ def __init__(self, descriptor_container: AbstractDescriptorProtocol): def mk_state_node(self, tag: QName, nsmapper: NamespaceHelper, - set_xsi_type: bool = True) -> ElementBase: + set_xsi_type: bool = True) -> xml_utils.LxmlElement: """Create an etree node from instance data.""" return super().mk_node(tag, nsmapper, set_xsi_type=set_xsi_type) @@ -122,7 +123,7 @@ def __repr__(self) -> str: return f'{self.__class__.__name__} DescriptorHandle="{self.DescriptorHandle}" StateVersion={self.StateVersion}' @classmethod - def from_node(cls, node: ElementBase, + def from_node(cls, node: xml_utils.LxmlElement, descriptor_container: AbstractDescriptorProtocol | None = None) -> AbstractStateContainer: """Create an instance from XML node.""" obj = cls(descriptor_container) @@ -553,7 +554,7 @@ def update_from_other_container(self, other: AbstractMultiStateContainer, skippe f'Update from a node with different handle is not possible! Have "{self.Handle}", got "{other.Handle}"') super().update_from_other_container(other, skipped_properties) - def mk_state_node(self, tag: QName, nsmapper: NamespaceHelper, set_xsi_type: bool = True) -> ElementBase: + def mk_state_node(self, tag: QName, nsmapper: NamespaceHelper, set_xsi_type: bool = True) -> xml_utils.LxmlElement: """Create an etree node from instance data.""" if self.Handle is None: self.Handle = uuid.uuid4().hex diff --git a/src/sdc11073/provider/dpwshostedservice.py b/src/sdc11073/provider/dpwshostedservice.py index 16449b9c..42084695 100644 --- a/src/sdc11073/provider/dpwshostedservice.py +++ b/src/sdc11073/provider/dpwshostedservice.py @@ -10,6 +10,7 @@ from ..xml_types import mex_types from ..xml_types.addressing_types import EndpointReferenceType from ..xml_types.dpws_types import HostedServiceType +from sdc11073 import xml_utils _wsdl_ns = ns_hlp.WSDL.namespace @@ -22,7 +23,7 @@ WSDL_S12 = ns_hlp.WSDL12.namespace # old soap 12 namespace, used in wsdl 1.1. used only for wsdl -def etree_from_file(path) -> etree_.ElementBase: +def etree_from_file(path) -> xml_utils.LxmlElement: parser = etree_.ETCompatXMLParser(resolve_entities=False) doc = etree_.parse(path, parser=parser) return doc.getroot() diff --git a/src/sdc11073/provider/porttypes/descriptioneventserviceimpl.py b/src/sdc11073/provider/porttypes/descriptioneventserviceimpl.py index 4f1fc882..6e83b5fe 100644 --- a/src/sdc11073/provider/porttypes/descriptioneventserviceimpl.py +++ b/src/sdc11073/provider/porttypes/descriptioneventserviceimpl.py @@ -9,7 +9,7 @@ if TYPE_CHECKING: from ...mdib.descriptorcontainers import AbstractDescriptorContainer from ...mdib.statecontainers import AbstractStateContainer - from lxml import etree as etree_ + from sdc11073 import xml_utils class DescriptionEventService(DPWSPortTypeBase): port_type_name = PrefixesEnum.SDC.tag('DescriptionEventService') @@ -39,7 +39,7 @@ def send_descriptor_updates(self, updated: List[AbstractDescriptorContainer], subscription_mgr.send_to_subscribers(body_node, action, mdib_version_group, 'send_descriptor_updates') def mk_description_modification_report_body(self, mdib_version_group, updated, created, deleted, - updated_states) -> etree_.ElementBase: + updated_states) -> xml_utils.LxmlElement: # This method creates one ReportPart for every descriptor. # An optimization is possible by grouping all descriptors with the same parent handle into one ReportPart. # This is not implemented, and I think it is not needed. diff --git a/src/sdc11073/provider/porttypes/porttypebase.py b/src/sdc11073/provider/porttypes/porttypebase.py index 643715ff..8d7ce331 100644 --- a/src/sdc11073/provider/porttypes/porttypebase.py +++ b/src/sdc11073/provider/porttypes/porttypebase.py @@ -11,6 +11,7 @@ if TYPE_CHECKING: from ...pysoap.msgfactory import CreatedMessage from ...namespaces import PrefixNamespace + from sdc11073 import xml_utils msg_prefix = PrefixesEnum.MSG.prefix @@ -64,7 +65,7 @@ def actions(self): # just a shortcut def add_wsdl_port_type(self, parent_node): raise NotImplementedError - def _mk_port_type_node(self, parent_node: etree_.ElementBase, is_event_source: bool =False) -> etree_.ElementBase: + def _mk_port_type_node(self, parent_node: xml_utils.LxmlElement, is_event_source: bool =False) -> xml_utils.LxmlElement: """ Needed for wsdl message :param parent_node: where to add data :param is_event_source: true if port type provides notification @@ -189,7 +190,7 @@ def _handle_operation_request(self, request_data, request, set_response) -> Crea return self._sdc_device.msg_factory.mk_reply_soap_message(request_data, set_response) -def _mk_wsdl_operation(parent_node, operation_name, input_message_name, output_message_name) -> etree_.ElementBase: +def _mk_wsdl_operation(parent_node, operation_name, input_message_name, output_message_name) -> xml_utils.LxmlElement: elem = etree_.SubElement(parent_node, _wsdl_operation, attrib={'name': operation_name}) if input_message_name is not None: etree_.SubElement(elem, etree_.QName(_wsdl_ns, 'input'), @@ -202,10 +203,10 @@ def _mk_wsdl_operation(parent_node, operation_name, input_message_name, output_m return elem -def mk_wsdl_two_way_operation(parent_node: etree_.ElementBase, +def mk_wsdl_two_way_operation(parent_node: xml_utils.LxmlElement, operation_name: str, input_message_name: Optional[str] = None, - output_message_name: Optional[str] = None) -> etree_.ElementBase: + output_message_name: Optional[str] = None) -> xml_utils.LxmlElement: """ A helper for wsdl generation. A two-way-operation defines a 'normal' request and response operation. :param parent_node: info shall be added to this node @@ -222,9 +223,9 @@ def mk_wsdl_two_way_operation(parent_node: etree_.ElementBase, output_message_name=output_msg_name) -def mk_wsdl_one_way_operation(parent_node: etree_.ElementBase, +def mk_wsdl_one_way_operation(parent_node: xml_utils.LxmlElement, operation_name: str, - output_message_name: Optional[str] = None) -> etree_.ElementBase: + output_message_name: Optional[str] = None) -> xml_utils.LxmlElement: """ A helper for wsdl generation. A one-way-operation is a subscription. :param parent_node: info shall be added to this node diff --git a/src/sdc11073/provider/subscriptionmgr.py b/src/sdc11073/provider/subscriptionmgr.py index 22eeaf7f..77221f12 100644 --- a/src/sdc11073/provider/subscriptionmgr.py +++ b/src/sdc11073/provider/subscriptionmgr.py @@ -14,12 +14,13 @@ if TYPE_CHECKING: from ..dispatch import RequestData + from sdc11073 import xml_utils class BicepsSubscription(ActionBasedSubscription): """ This extends ActionBasedSubscription with the ability to send notifications. The class is used by ActionBasedSubscriptionsManager.""" - def send_notification_report(self, body_node: etree_.ElementBase, action: str): + def send_notification_report(self, body_node: xml_utils.LxmlElement, action: str): if not self.is_valid: return inf = HeaderInformationBlock(addr_to=self.notify_to_address, diff --git a/src/sdc11073/provider/subscriptionmgr_async.py b/src/sdc11073/provider/subscriptionmgr_async.py index 2af56a22..0186a4a1 100644 --- a/src/sdc11073/provider/subscriptionmgr_async.py +++ b/src/sdc11073/provider/subscriptionmgr_async.py @@ -28,6 +28,7 @@ from sdc11073.pysoap.msgfactory import MessageFactory from sdc11073.pysoap.msgreader import ReceivedMessage from sdc11073.pysoap.soapclientpool import SoapClientPool + from sdc11073 import xml_utils def _mk_dispatch_identifier(reference_parameters: list, path_suffix: str) -> tuple[str | None, str]: @@ -43,7 +44,7 @@ def _mk_dispatch_identifier(reference_parameters: list, path_suffix: str) -> tup class BicepsSubscriptionAsync(ActionBasedSubscription): """Async version of a single BICEPS subscription. It is used by BICEPSSubscriptionsManagerBaseAsync.""" - async def async_send_notification_report(self, body_node: etree_.ElementBase, action: str): + async def async_send_notification_report(self, body_node: xml_utils.LxmlElement, action: str): """Send notification to subscriber.""" if not self.is_valid or self.unsubscribed_at is not None: return @@ -194,7 +195,7 @@ def _end_all_subscriptions(self, send_subscription_end: bool): apply_map(lambda subscription: subscription.close_by_subscription_manager(), self._subscriptions.objects) self._subscriptions.clear() - def send_to_subscribers(self, payload: MessageType | etree_.ElementBase, + def send_to_subscribers(self, payload: MessageType | xml_utils.LxmlElement, action: str, mdib_version_group: MdibVersionGroup, what: str): @@ -231,7 +232,7 @@ def send_to_subscribers(self, payload: MessageType | etree_.ElementBase, '{}: _send_to_subscribers {} returned {}', what, subscribers[counter], element) async def _async_send_notification_report(self, subscription: BicepsSubscriptionAsync, - body_node: etree_.ElementBase, + body_node: xml_utils.LxmlElement, action: str): try: self._logger.debug('send notification report {} to {}', action, subscription) # noqa: PLE1205 diff --git a/src/sdc11073/provider/subscriptionmgr_base.py b/src/sdc11073/provider/subscriptionmgr_base.py index 6b6c46d9..203a2c6d 100644 --- a/src/sdc11073/provider/subscriptionmgr_base.py +++ b/src/sdc11073/provider/subscriptionmgr_base.py @@ -12,7 +12,7 @@ from lxml import etree as etree_ -from sdc11073 import loghelper, multikey, observableproperties +from sdc11073 import loghelper, multikey, observableproperties, xml_utils from sdc11073.etc import apply_map from sdc11073.pysoap.soapclient import HTTPReturnCodeError from sdc11073.pysoap.soapenvelope import Fault, faultcodeEnum @@ -224,7 +224,7 @@ def get_roundtrip_stats(self) -> RoundTripData: return RoundTripData(None, None) def _mk_notification_message(self, header_info: HeaderInformationBlock, - body_node: etree_.ElementBase) -> CreatedMessage: + body_node: xml_utils.LxmlElement) -> CreatedMessage: return self._msg_factory.mk_soap_message_etree_payload(header_info, body_node) @@ -432,7 +432,7 @@ def _get_subscription_for_request(self, request_data: RequestData) -> Subscripti request_data.message_data.q_name, dispatch_identifier, request_data.peer_name) return subscription - def send_to_subscribers(self, payload: MessageType | etree_.ElementBase, + def send_to_subscribers(self, payload: MessageType | xml_utils.LxmlElement, action: str, mdib_version_group, what: str): @@ -456,7 +456,7 @@ def send_to_subscribers(self, payload: MessageType | etree_.ElementBase, except: raise - def _send_notification_report(self, subscription, body_node: etree_.ElementBase, action: str): + def _send_notification_report(self, subscription, body_node: xml_utils.LxmlElement, action: str): try: subscription.send_notification_report(body_node, action) except ConnectionRefusedError as ex: diff --git a/src/sdc11073/pysoap/msgfactory.py b/src/sdc11073/pysoap/msgfactory.py index 66648142..4a4f3c69 100644 --- a/src/sdc11073/pysoap/msgfactory.py +++ b/src/sdc11073/pysoap/msgfactory.py @@ -14,6 +14,7 @@ from ..xml_types.msg_types import MessageType from ..definitions_base import BaseDefinitions from ..namespaces import PrefixNamespace + from sdc11073 import xml_utils class CreatedMessage: @@ -103,7 +104,7 @@ def mk_soap_message(self, def mk_soap_message_etree_payload(self, header_info: HeaderInformationBlock, - payload_element: Optional[etree_.ElementBase] = None): + payload_element: xml_utils.LxmlElement | None = None): nsh = self.ns_hlp my_ns_map = nsh.partial_map(nsh.S12, nsh.WSE, nsh.WSA) soap_envelope = Soap12Envelope(my_ns_map) diff --git a/src/sdc11073/pysoap/msgreader.py b/src/sdc11073/pysoap/msgreader.py index 60820d08..c4e1cb67 100644 --- a/src/sdc11073/pysoap/msgreader.py +++ b/src/sdc11073/pysoap/msgreader.py @@ -24,9 +24,10 @@ from sdc11073.mdib.descriptorcontainers import AbstractDescriptorProtocol from sdc11073.mdib.statecontainers import AbstractStateProtocol from sdc11073.namespaces import PrefixNamespace + from sdc11073 import xml_utils -def validate_node(node: etree_.ElementBase, xml_schema: etree_.XMLSchema, logger: LoggerAdapter): +def validate_node(node: xml_utils.LxmlElement, xml_schema: etree_.XMLSchema, logger: LoggerAdapter): """Let xml_schema instance validate the node.""" try: xml_schema.assertValid(node) @@ -41,7 +42,7 @@ def validate_node(node: etree_.ElementBase, xml_schema: etree_.XMLSchema, logger raise ValidationError(reason='document invalid', soap_fault=fault) from ex -def _get_text(node: etree_.ElementBase, q_name: etree_.QName) -> str | None: +def _get_text(node: xml_utils.LxmlElement, q_name: etree_.QName) -> str | None: if node is None: return None tmp = node.find(q_name) @@ -64,7 +65,7 @@ class MdibVersionGroupReader: instance_id: int | None @classmethod - def from_node(cls, node: etree_.ElementBase) -> MdibVersionGroupReader: + def from_node(cls, node: xml_utils.LxmlElement) -> MdibVersionGroupReader: """Construct from a node with version attributes.""" mdib_version = int(node.get('MdibVersion', '0')) sequence_id = node.get('SequenceId') @@ -163,7 +164,7 @@ def read_get_mdib_response(self, received_message_data: ReceivedMessage) -> tupl mdib_node = received_message_data.p_msg.msg_node[0] return self.read_get_mdib_payload(mdib_node) - def read_get_mdib_payload(self, mdib_node: etree_.ElementBase) -> tuple[ + def read_get_mdib_payload(self, mdib_node: xml_utils.LxmlElement) -> tuple[ list[AbstractDescriptorProtocol], list[AbstractStateProtocol]]: """Return list of all descriptors and states in mdib.""" descriptors = [] @@ -184,7 +185,7 @@ def read_mdib_xml(self, xml_text: bytes) -> tuple[list[AbstractDescriptorProtoco return self.read_get_mdib_payload(payload[0]) return self.read_get_mdib_payload(payload) - def read_xml_text(self, xml_text: bytes) -> etree_.ElementBase: + def read_xml_text(self, xml_text: bytes) -> xml_utils.LxmlElement: """Parse imput, return a node.""" parser = etree_.ETCompatXMLParser(resolve_entities=False) try: @@ -195,10 +196,10 @@ def read_xml_text(self, xml_text: bytes) -> etree_.ElementBase: self._validate_node(node) return node - def _read_md_description_node(self, md_description_node: etree_.ElementBase) -> list[AbstractDescriptorProtocol]: + def _read_md_description_node(self, md_description_node: xml_utils.LxmlElement) -> list[AbstractDescriptorProtocol]: descriptions = [] - def add_children(parent_node: etree_.ElementBase): + def add_children(parent_node: xml_utils.LxmlElement): p_handle = parent_node.get('Handle') for child_node in parent_node: if child_node.get('Handle') is not None: @@ -214,7 +215,7 @@ def add_children(parent_node: etree_.ElementBase): add_children(mds_node) return descriptions - def _read_md_state_node(self, md_state_node: etree_.ElementBase) -> list[AbstractStateProtocol]: + def _read_md_state_node(self, md_state_node: xml_utils.LxmlElement) -> list[AbstractStateProtocol]: """Parse a GetMdStateResponse or the MdState part of GetMdibResponse.""" state_containers = [] all_state_nodes = md_state_node.findall(self.pm_names.State) @@ -222,14 +223,14 @@ def _read_md_state_node(self, md_state_node: etree_.ElementBase) -> list[Abstrac state_containers.append(self._mk_state_container_from_node(state_node)) return state_containers - def _mk_descriptor_container_from_node(self, node: etree_.ElementBase, + def _mk_descriptor_container_from_node(self, node: xml_utils.LxmlElement, parent_handle: str | None) -> AbstractDescriptorProtocol: node_type = node.get(QN_TYPE) node_type = text_to_qname(node_type, node.nsmap) if node_type is not None else etree_.QName(node.tag) descr_cls = self.get_descriptor_container_class(node_type) return descr_cls.from_node(node, parent_handle) - def _mk_state_container_from_node(self, node: etree_.ElementBase, + def _mk_state_container_from_node(self, node: xml_utils.LxmlElement, forced_type: etree_.QName | None = None) -> AbstractStateProtocol: """Create a state container from a node. @@ -256,7 +257,7 @@ def _mk_state_container_from_node(self, node: etree_.ElementBase, state.node = node return state - def _validate_node(self, node: etree_.ElementBase): + def _validate_node(self, node: xml_utils.LxmlElement): if self._validate: validate_node(node, self._xml_schema, self._logger) diff --git a/src/sdc11073/pysoap/soapenvelope.py b/src/sdc11073/pysoap/soapenvelope.py index 696d1a1d..370f429d 100644 --- a/src/sdc11073/pysoap/soapenvelope.py +++ b/src/sdc11073/pysoap/soapenvelope.py @@ -12,6 +12,7 @@ if TYPE_CHECKING: from sdc11073.xml_types.addressing_types import HeaderInformationBlock + from sdc11073 import xml_utils CHECK_NAMESPACES = False # can be used to enable additional checks for too many namespaces or undefined namespaces @@ -44,7 +45,7 @@ def __init__(self, ns_map: dict | None = None): self._nsmap[prefix.prefix] = prefix.namespace self.header_info_block = None - def add_header_element(self, element: etree_.ElementBase): + def add_header_element(self, element: xml_utils.LxmlElement): """Add element to soap header.""" self._header_nodes.append(element) @@ -53,12 +54,12 @@ def set_header_info_block(self, header_info_block: HeaderInformationBlock): self.header_info_block = header_info_block @property - def payload_element(self) -> etree_.ElementBase: + def payload_element(self) -> xml_utils.LxmlElement: """Get payload of soap envelope ( child node of Body element).""" return self._payload_element @payload_element.setter - def payload_element(self, element: etree_.ElementBase): + def payload_element(self, element: xml_utils.LxmlElement): if self._payload_element is not None: raise ApiUsageError('there can be only one body object') self._payload_element = element @@ -69,7 +70,7 @@ def nsmap(self) -> dict: return self._nsmap @property - def header_nodes(self) -> list[etree_.ElementBase]: + def header_nodes(self) -> list[xml_utils.LxmlElement]: """Get the list of header nodes.""" return self._header_nodes @@ -79,11 +80,11 @@ class ReceivedSoapMessage: __slots__ = ('msg_node', 'msg_name', 'raw_data', 'header_info_block', '_doc_root', 'header_node', 'body_node') - def __init__(self, xml_text: bytes, doc_root: etree_.ElementBase): + def __init__(self, xml_text: bytes, doc_root: xml_utils.LxmlElement): self.raw_data = xml_text - self._doc_root: etree_.ElementBase = doc_root - self.header_node: etree_.ElementBase = self._doc_root.find(ns_hlp.S12.tag('Header')) - self.body_node: etree_.ElementBase = self._doc_root.find(ns_hlp.S12.tag('Body')) + self._doc_root: xml_utils.LxmlElement = doc_root + self.header_node: xml_utils.LxmlElement = self._doc_root.find(ns_hlp.S12.tag('Header')) + self.body_node: xml_utils.LxmlElement = self._doc_root.find(ns_hlp.S12.tag('Body')) self.header_info_block: HeaderInformationBlock | None = None try: self.msg_node = self.body_node[0] diff --git a/src/sdc11073/xml_types/addressing_types.py b/src/sdc11073/xml_types/addressing_types.py index c4902baf..f0004f92 100644 --- a/src/sdc11073/xml_types/addressing_types.py +++ b/src/sdc11073/xml_types/addressing_types.py @@ -10,7 +10,8 @@ from .basetypes import ElementWithText, XMLTypeBase if TYPE_CHECKING: - from lxml.etree import Element, ElementBase, QName + from lxml.etree import QName + from sdc11073 import xml_utils _is_reference_parameter = nsh.WSA.tag('IsReferenceParameter') @@ -65,7 +66,7 @@ def __init__(self, action: str | None = None, addr_to: str | None = None, relates_to: str | None = None, addr_from: str | None = None, - reference_parameters: list[ElementBase] | None = None, + reference_parameters: list[xml_utils.LxmlElement] | None = None, relationship_type: QName | None = None): super().__init__() if action is not None: @@ -104,7 +105,7 @@ def mk_reply_header_block(self, action: str | None = None, reply_address.Action = MustUnderStandTextElement(action) return reply_address - def as_etree_node(self, q_name: QName, ns_map: dict[str, str]) -> ElementBase: + def as_etree_node(self, q_name: QName, ns_map: dict[str, str]) -> xml_utils.LxmlElement: """Create etree Element form instance data.""" node = super().as_etree_node(q_name, ns_map) for param in self.reference_parameters: @@ -114,7 +115,7 @@ def as_etree_node(self, q_name: QName, ns_map: dict[str, str]) -> ElementBase: return node @classmethod - def from_node(cls, node: ElementBase) -> HeaderInformationBlock: + def from_node(cls, node: xml_utils.LxmlElement) -> HeaderInformationBlock: """Create HeaderInformationBlock from etree element.""" obj = cls() obj.update_from_node(node) diff --git a/src/sdc11073/xml_types/basetypes.py b/src/sdc11073/xml_types/basetypes.py index 8d765fc1..e6db58f0 100644 --- a/src/sdc11073/xml_types/basetypes.py +++ b/src/sdc11073/xml_types/basetypes.py @@ -4,12 +4,15 @@ import inspect import traceback from math import isclose -from typing import List +from typing import List, TYPE_CHECKING from lxml import etree as etree_ from .xml_structure import NodeStringProperty, NodeTextListProperty +if TYPE_CHECKING: + from sdc11073 import xml_utils + class StringEnum(str, enum.Enum): @@ -37,7 +40,7 @@ def as_etree_node(self, q_name: etree_.QName, ns_map: dict): self.update_node(node) return node - def update_node(self, node: etree_.ElementBase): + def update_node(self, node: xml_utils.LxmlElement): for prop_name, prop in self.sorted_container_properties(): try: prop.update_xml_value(self, node) @@ -46,7 +49,7 @@ def update_node(self, node: etree_.ElementBase): raise ValueError( f'In {self.__class__.__name__}.{prop_name}, {str(prop)} could not update: {traceback.format_exc()}') from ex - def update_from_node(self, node: etree_.ElementBase): + def update_from_node(self, node: xml_utils.LxmlElement): for dummy, prop in self.sorted_container_properties(): prop.update_from_node(self, node) diff --git a/src/sdc11073/xml_types/pm_types.py b/src/sdc11073/xml_types/pm_types.py index c0096264..6809f089 100644 --- a/src/sdc11073/xml_types/pm_types.py +++ b/src/sdc11073/xml_types/pm_types.py @@ -16,7 +16,8 @@ from .basetypes import StringEnum, XMLTypeBase if TYPE_CHECKING: - from lxml.etree import ElementBase, QName + from lxml.etree import QName + from sdc11073 import xml_utils from sdc11073.xml_types.isoduration import DateTypeUnion, DurationType @@ -284,7 +285,7 @@ class PropertyBasedPMType(XMLTypeBase): """ @classmethod - def value_class_from_node(cls, node: ElementBase) -> type[PropertyBasedPMType]: + def value_class_from_node(cls, node: xml_utils.LxmlElement) -> type[PropertyBasedPMType]: """If node has a xsi:Type attribute, return the class that reflects that type, else cls.""" xsi_type_str = node.get(QN_TYPE) if xsi_type_str is None: @@ -317,7 +318,7 @@ def __init__(self, text: str, # noqa: PLR0913 self.TextWidth = text_width @classmethod - def from_node(cls, node: ElementBase) -> LocalizedText: + def from_node(cls, node: xml_utils.LxmlElement) -> LocalizedText: """Construct class from a node.""" obj = cls('') obj.update_from_node(node) @@ -356,7 +357,7 @@ def __post_init__(self): raise TypeError('code must be a string!') @classmethod - def from_node(cls, node: ElementBase) -> Coding: + def from_node(cls, node: xml_utils.LxmlElement) -> Coding: """Construct class from a node.""" code = node.get('Code') coding_system = node.get('CodingSystem', DEFAULT_CODING_SYSTEM) @@ -398,7 +399,7 @@ def __repr__(self) -> str: f'codingsystemversion={self.CodingSystemVersion})') @classmethod - def from_node(cls, node: ElementBase) -> Translation: + def from_node(cls, node: xml_utils.LxmlElement) -> Translation: """Construct class from a node.""" obj = cls('') obj.update_from_node(node) @@ -482,7 +483,7 @@ def is_equivalent(self, other: Coding | CodedValue) -> bool: return have_matching_codes(self, other) @classmethod - def from_node(cls, node: ElementBase) -> CodedValue: + def from_node(cls, node: xml_utils.LxmlElement) -> CodedValue: """Construct class from a node.""" obj = cls('') obj.update_from_node(node) @@ -626,7 +627,7 @@ def __init__(self, self.MeasurementUnit = unit @classmethod - def from_node(cls, node: ElementBase) -> Measurement: + def from_node(cls, node: xml_utils.LxmlElement) -> Measurement: """Construct class from a node.""" obj = cls(None, None) obj.update_from_node(node) @@ -1402,7 +1403,7 @@ def __init__(self, method: RetrievabilityMethod, update_period: DurationType | N self.UpdatePeriod = update_period @classmethod - def from_node(cls, node: ElementBase) -> RetrievabilityInfo: + def from_node(cls, node: xml_utils.LxmlElement) -> RetrievabilityInfo: """Construct class from a node.""" obj = cls(RetrievabilityMethod.GET) # any allowed value, will be overwritten in update_node obj.update_from_node(node) diff --git a/src/sdc11073/xml_types/xml_structure.py b/src/sdc11073/xml_types/xml_structure.py index d796d3a5..6c81583d 100644 --- a/src/sdc11073/xml_types/xml_structure.py +++ b/src/sdc11073/xml_types/xml_structure.py @@ -32,6 +32,7 @@ StringConverter, TimestampConverter, ) +from sdc11073 import xml_utils if TYPE_CHECKING: from collections.abc import Iterable, Sequence @@ -156,7 +157,7 @@ def init_instance_data(self, instance: Any): setattr(instance, self._local_var_name, copy.deepcopy(self._default_py_value)) @abstractmethod - def update_xml_value(self, instance: Any, node: etree_.ElementBase): + def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement): """Update node with current data from instance. This method is used internally and should not be called by application. @@ -166,7 +167,7 @@ def update_xml_value(self, instance: Any, node: etree_.ElementBase): """ @abstractmethod - def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase): + def get_py_value_from_node(self, instance: Any, node: xml_utils.LxmlElement): """Read data from node. This method is used internally and should not be called by application. @@ -175,7 +176,7 @@ def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase): :return: value """ - def update_from_node(self, instance: Any, node: etree_.ElementBase): + def update_from_node(self, instance: Any, node: xml_utils.LxmlElement): """Update instance data with data from node. This method is used internally and should not be called by application. @@ -216,11 +217,11 @@ def __init__(self, attribute_name: str, # noqa: PLR0913 self._attribute_name = attribute_name def get_py_value_from_node(self, instance: Any, # noqa: ARG002 - node: etree_.ElementBase | None) -> Any: + node: xml_utils.LxmlElement | None) -> Any: xml_value = None if node is None else node.attrib.get(self._attribute_name) return None if xml_value is None else self._converter.to_py(xml_value) - def update_xml_value(self, instance: Any, node: etree_.ElementBase): + def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement): """Write value to node.""" try: py_value = getattr(instance, self._local_var_name) @@ -268,9 +269,9 @@ def __init__(self, sub_element_name: etree_.QName | None, # noqa: PLR0913 self._sub_element_name = sub_element_name @staticmethod - def _get_element_by_child_name(node: etree_.ElementBase, + def _get_element_by_child_name(node: xml_utils.LxmlElement, sub_element_name: etree_.QName | None, - create_missing_nodes: bool) -> etree_.ElementBase: + create_missing_nodes: bool) -> xml_utils.LxmlElement: if sub_element_name is None: return node sub_node = node.find(sub_element_name) @@ -280,7 +281,7 @@ def _get_element_by_child_name(node: etree_.ElementBase, sub_node = etree_.SubElement(node, sub_element_name) # create this node return sub_node - def remove_sub_element(self, node: etree_.ElementBase): + def remove_sub_element(self, node: xml_utils.LxmlElement): if self._sub_element_name is None: return sub_node = node.find(self._sub_element_name) @@ -369,7 +370,7 @@ def __init__(self, attribute_name: str, super().__init__(attribute_name, value_converter=TimestampConverter, default_py_value=None, is_optional=is_optional) - def update_xml_value(self, instance: Any, node: etree_.ElementBase): + def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement): """Write value to node.""" setattr(instance, self._local_var_name, time.time()) super().update_xml_value(instance, node) @@ -467,11 +468,11 @@ def __init__(self, attribute_name: str, default_py_value=default_py_value, implied_py_value=implied_py_value, is_optional=is_optional) def get_py_value_from_node(self, instance: Any, # noqa: ARG002 - node: etree_.ElementBase | None) -> Any: + node: xml_utils.LxmlElement | None) -> Any: xml_value = None if node is None else node.attrib.get(self._attribute_name) return None if xml_value is None else text_to_qname(xml_value, node.nsmap) - def update_xml_value(self, instance: Any, node: etree_.ElementBase): + def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement): """Write value to node.""" try: py_value = getattr(instance, self._local_var_name) @@ -521,14 +522,14 @@ def init_instance_data(self, instance: Any): setattr(instance, self._local_var_name, []) def get_py_value_from_node(self, instance: Any, # noqa: ARG002 - node: etree_.ElementBase | None) -> list[Any]: + node: xml_utils.LxmlElement | None) -> list[Any]: xml_value = None if node is None else node.attrib.get(self._attribute_name) if xml_value is not None: split_result = xml_value.split(' ') return [self._converter.elem_to_py(val) for val in split_result if val] return [] - def update_xml_value(self, instance: Any, node: etree_.ElementBase): + def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement): try: py_value = getattr(instance, self._local_var_name) except AttributeError: @@ -607,7 +608,7 @@ def __init__(self, sub_element_name: etree_.QName | None, # noqa: PLR0913 is_optional) self._min_length = min_length - def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any: # noqa: ARG002 + def get_py_value_from_node(self, instance: Any, node: xml_utils.LxmlElement) -> Any: # noqa: ARG002 """Read value from node. :return: None if the element was not found, else result of converter. @@ -618,7 +619,7 @@ def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any return None # element was not found, return None return self._converter.to_py(sub_node.text) - def update_xml_value(self, instance: Any, node: etree_.ElementBase): + def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement): """Write value to node.""" try: py_value = getattr(instance, self._local_var_name) @@ -702,7 +703,7 @@ def __init__(self, sub_element_name: etree_.QName | None, # noqa: PLR0913 is_optional, min_length=1) self.enum_cls = enum_cls - def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any: # noqa: ARG002 + def get_py_value_from_node(self, instance: Any, node: xml_utils.LxmlElement) -> Any: # noqa: ARG002 """Read value from node.""" try: sub_node = self._get_element_by_child_name(node, self._sub_element_name, create_missing_nodes=False) @@ -713,7 +714,7 @@ def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any except ElementNotFoundError: return self._default_py_value - def update_xml_value(self, instance: Any, node: etree_.ElementBase): + def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement): """Write value to node.""" try: py_value = getattr(instance, self._local_var_name) @@ -764,7 +765,7 @@ def __init__(self, sub_element_name: etree_.QName | None, super().__init__(sub_element_name, ClassCheckConverter(etree_.QName), default_py_value, is_optional=is_optional) - def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any: # noqa: ARG002 + def get_py_value_from_node(self, instance: Any, node: xml_utils.LxmlElement) -> Any: # noqa: ARG002 """Read value from node.""" try: sub_node = self._get_element_by_child_name(node, self._sub_element_name, create_missing_nodes=False) @@ -776,7 +777,7 @@ def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any pass return self._default_py_value - def update_xml_value(self, instance: Any, node: etree_.ElementBase): + def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement): """Write value to node.""" try: py_value = getattr(instance, self._local_var_name) @@ -802,7 +803,7 @@ def update_xml_value(self, instance: Any, node: etree_.ElementBase): sub_node.text = value -def _compare_extension(left: etree_.ElementBase, right: etree_.ElementBase) -> bool: +def _compare_extension(left: xml_utils.LxmlElement, right: xml_utils.LxmlElement) -> bool: # xml comparison try: if left.tag != right.tag: # compare expanded names @@ -824,9 +825,9 @@ def _compare_extension(left: etree_.ElementBase, right: etree_.ElementBase) -> b return all(map(_compare_extension, left_children, right_children)) # compare children but keep order -class ExtensionLocalValue(list[etree_.ElementBase]): +class ExtensionLocalValue(list[xml_utils.LxmlElement]): - compare_method: Callable[[etree_.ElementBase, etree_.ElementBase], bool] = _compare_extension + compare_method: Callable[[xml_utils.LxmlElement, xml_utils.LxmlElement], bool] = _compare_extension """may be overwritten by user if a custom comparison behaviour is required""" def __eq__(self, other: Sequence) -> bool: @@ -866,7 +867,7 @@ def __get__(self, instance, owner): # noqa: ANN001 setattr(instance, self._local_var_name, value) return value - def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any: + def get_py_value_from_node(self, instance: Any, node: xml_utils.LxmlElement) -> Any: """Read value from node.""" try: extension_nodes = self._get_element_by_child_name(node, self._sub_element_name, create_missing_nodes=False) @@ -874,7 +875,7 @@ def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any return ExtensionLocalValue() return ExtensionLocalValue(extension_nodes[:]) - def update_xml_value(self, instance: Any, node: etree_.ElementBase): + def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement): """Write value to node. The Extension Element is only added if there is at least one element available in local list. @@ -886,8 +887,7 @@ def update_xml_value(self, instance: Any, node: etree_.ElementBase): if extension_local_value is None or len(extension_local_value) == 0: return # nothing to add sub_node = self._get_element_by_child_name(node, self._sub_element_name, create_missing_nodes=True) - - sub_node.extend(copy.copy(x) for x in extension_local_value) + sub_node.extend(xml_utils.copy_node_wo_parent(x) for x in extension_local_value) class AnyEtreeNodeProperty(_ElementBase): @@ -897,7 +897,7 @@ def __init__(self, sub_element_name: etree_.QName | None, is_optional: bool = Fa super().__init__(sub_element_name, NullConverter, default_py_value=None, is_optional=is_optional) - def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any: # noqa: ARG002 + def get_py_value_from_node(self, instance: Any, node: xml_utils.LxmlElement) -> Any: # noqa: ARG002 """Read value from node.""" try: sub_node = self._get_element_by_child_name(node, self._sub_element_name, create_missing_nodes=False) @@ -905,7 +905,7 @@ def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any return None return sub_node[:] # all children - def update_xml_value(self, instance: Any, node: etree_.ElementBase): + def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement): """Write value to node.""" try: py_value = getattr(instance, self._local_var_name) @@ -939,7 +939,7 @@ def __init__(self, sub_element_name: etree_.QName | None, # noqa: PLR0913 is_optional) self.value_class = value_class - def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any: # noqa: ARG002 + def get_py_value_from_node(self, instance: Any, node: xml_utils.LxmlElement) -> Any: # noqa: ARG002 """Read value from node.""" value = self._default_py_value try: @@ -950,7 +950,7 @@ def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any pass return value - def update_xml_value(self, instance: Any, node: etree_.ElementBase): + def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement): """Write value to node.""" try: py_value = getattr(instance, self._local_var_name) @@ -992,7 +992,7 @@ def __init__(self, sub_element_name: etree_.QName | None, # noqa: PLR0913 self._cls_getter = cls_getter self._ns_helper = ns_helper - def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any: # noqa: ARG002 + def get_py_value_from_node(self, instance: Any, node: xml_utils.LxmlElement) -> Any: # noqa: ARG002 """Read value from node.""" value = self._default_py_value try: @@ -1008,7 +1008,7 @@ def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any pass return value - def update_xml_value(self, instance: Any, node: etree_.ElementBase): + def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement): """Write value to node.""" try: py_value = getattr(instance, self._local_var_name) @@ -1056,7 +1056,7 @@ def __init__(self, sub_element_name: etree_.QName | None, super().__init__(sub_element_name, ListConverter(ClassCheckConverter(value_class)), is_optional=is_optional) self.value_class = value_class - def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any: # noqa: ARG002 + def get_py_value_from_node(self, instance: Any, node: xml_utils.LxmlElement) -> Any: # noqa: ARG002 """Read value from node.""" objects = [] try: @@ -1069,7 +1069,7 @@ def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any except ElementNotFoundError: return objects - def update_xml_value(self, instance: Any, node: etree_.ElementBase): + def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement): """Write value to node.""" try: py_value = getattr(instance, self._local_var_name) @@ -1113,7 +1113,7 @@ def __init__(self, sub_element_name: etree_.QName | None, # noqa: PLR0913 self._cls_getter = cls_getter self._ns_helper = ns_helper - def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any: # noqa: ARG002 + def get_py_value_from_node(self, instance: Any, node: xml_utils.LxmlElement) -> Any: # noqa: ARG002 """Read value from node.""" objects = [] try: @@ -1131,7 +1131,7 @@ def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any except ElementNotFoundError: return objects - def update_xml_value(self, instance: Any, node: etree_.ElementBase): + def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement): """Write value to node.""" try: py_value = getattr(instance, self._local_var_name) @@ -1165,7 +1165,7 @@ def __init__(self, sub_element_name: etree_.QName | None, is_optional: bool = True): super().__init__(sub_element_name, ListConverter(ClassCheckConverter(value_class)), is_optional=is_optional) - def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any: # noqa: ARG002 + def get_py_value_from_node(self, instance: Any, node: xml_utils.LxmlElement) -> Any: # noqa: ARG002 """Read value from node.""" objects = [] try: @@ -1176,7 +1176,7 @@ def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any except ElementNotFoundError: return objects - def update_xml_value(self, instance: Any, node: etree_.ElementBase): + def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement): """Write value to node.""" try: py_value = getattr(instance, self._local_var_name) @@ -1231,7 +1231,7 @@ def __init__(self, sub_element_name: etree_.QName | None, default_py_value=default_py_value, value_class=value_class) - def update_xml_value(self, instance: Any, node: etree_.ElementBase): + def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement): """Write value to node.""" try: py_value = getattr(instance, self._local_var_name) @@ -1251,14 +1251,14 @@ def __set__(self, instance: Any, py_value: Any): class AnyEtreeNodeListProperty(_ElementListProperty): - """class represents a list of etree_.Element.""" + """class represents a list of lxml elements.""" - def __init__(self, sub_element_name: etree_.QName | None, - is_optional: bool = True): - value_class = etree_._Element # noqa: SLF001 - super().__init__(sub_element_name, ListConverter(ClassCheckConverter(value_class)), is_optional=is_optional) + def __init__(self, sub_element_name: etree_.QName | None, is_optional: bool = True): + super().__init__(sub_element_name, + ListConverter(ClassCheckConverter(xml_utils.LxmlElement)), + is_optional=is_optional) - def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any: # noqa: ARG002 + def get_py_value_from_node(self, instance: Any, node: xml_utils.LxmlElement) -> Any: # noqa: ARG002 """Read value from node.""" objects = [] try: @@ -1269,7 +1269,7 @@ def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any except ElementNotFoundError: return objects - def update_xml_value(self, instance: Any, node: etree_.ElementBase): + def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement): """Write value to node.""" try: py_value = getattr(instance, self._local_var_name) @@ -1297,7 +1297,7 @@ def __init__(self, sub_element_name: etree_.QName | None, super().__init__(sub_element_name, ListConverter(ClassCheckConverter(value_class)), is_optional=is_optional) - def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any: # noqa: ARG002 + def get_py_value_from_node(self, instance: Any, node: xml_utils.LxmlElement) -> Any: # noqa: ARG002 """Read value from node.""" try: sub_node = self._get_element_by_child_name(node, self._sub_element_name, create_missing_nodes=False) @@ -1307,7 +1307,7 @@ def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any pass return self._default_py_value - def update_xml_value(self, instance: Any, node: etree_.ElementBase): + def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement): """Write value to node.""" try: py_value = getattr(instance, self._local_var_name) @@ -1343,7 +1343,7 @@ def __init__(self, sub_element_name: etree_.QName | None, super().__init__(sub_element_name, ListConverter(ClassCheckConverter(etree_.QName)), is_optional=is_optional) - def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any: # noqa: ARG002 + def get_py_value_from_node(self, instance: Any, node: xml_utils.LxmlElement) -> Any: # noqa: ARG002 """Read value from node.""" result = [] try: @@ -1358,7 +1358,7 @@ def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any pass return self._default_py_value or result - def update_xml_value(self, instance: Any, node: etree_.ElementBase): + def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement): """Write value to node.""" try: py_value = getattr(instance, self._local_var_name) @@ -1417,7 +1417,7 @@ def __init__(self, sub_element_name: etree_.QName | None, super().__init__(sub_element_name, ClassCheckConverter(datetime, date), default_py_value, implied_py_value, is_optional) - def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any: # noqa: ARG002 + def get_py_value_from_node(self, instance: Any, node: xml_utils.LxmlElement) -> Any: # noqa: ARG002 """Read value from node.""" try: sub_node = self._get_element_by_child_name(node, self._sub_element_name, create_missing_nodes=False) @@ -1428,7 +1428,7 @@ def get_py_value_from_node(self, instance: Any, node: etree_.ElementBase) -> Any pass return None - def update_xml_value(self, instance: Any, node: etree_.ElementBase): + def update_xml_value(self, instance: Any, node: xml_utils.LxmlElement): """Write value to node.""" try: py_value = getattr(instance, self._local_var_name) diff --git a/src/sdc11073/xml_utils.py b/src/sdc11073/xml_utils.py new file mode 100644 index 00000000..a286f4d4 --- /dev/null +++ b/src/sdc11073/xml_utils.py @@ -0,0 +1,57 @@ +"""Module containing utilities and helper methods regarding xml.""" + +import copy +import sys +from typing import Callable + +from lxml.etree import Element, _Element + +if sys.version_info >= (3, 10): + from typing import TypeAlias + + LxmlElement: TypeAlias = _Element +else: + LxmlElement = _Element + + +def copy_element(node: LxmlElement, method: Callable[[LxmlElement], LxmlElement] = copy.deepcopy) -> LxmlElement: + """Copy and preserve complete namespace. + + :param node: node to be copied + :param method: method that creates a duplication of the root node + :return: new node + """ + # walk from target to root + current = node + ns_map_list: list[dict[str, str]] = [] # saves all namespaces + while current is not None: + ns_map_list.append({k: v for k, v in current.nsmap.items() if k}) # filter for default namespace + current = current.getparent() + + # create new instance + root_tree = node.getroottree() + current = method(root_tree.getroot()) + x_path_steps = root_tree.getpath(node).split('/')[1:] + assert len(x_path_steps) == len(ns_map_list) + + # walk from root to target + ns_map_list.reverse() + for i, step in enumerate(x_path_steps): + x_path_elements = current.xpath(f'/{step}' if i == 0 else step, namespaces=ns_map_list[i]) + assert len(x_path_elements) == 1 + current = x_path_elements[0] + return current + + +def copy_node_wo_parent(node: LxmlElement, method: Callable[[LxmlElement], LxmlElement] = copy.deepcopy) -> LxmlElement: + """Copy node but only keep relevant information and no parent. + + :param node: node to be copied + :param method: method that copies an etree element + :return: new node + """ + new_node = Element(node.tag, attrib=node.attrib, nsmap=node.nsmap) + new_node.text = node.text + new_node.tail = node.tail + new_node.extend(method(child) for child in node) + return new_node diff --git a/tests/test_xml_utils.py b/tests/test_xml_utils.py new file mode 100644 index 00000000..b05b549d --- /dev/null +++ b/tests/test_xml_utils.py @@ -0,0 +1,276 @@ +import unittest + +import lxml + +from sdc11073 import xml_utils + + +class TestXmlParsing(unittest.TestCase): + xml_to_be_parsed = (b""" + + + https://127.0.0.1:51341/ + + http://standards.ieee.org/downloads/11073/11073-20701-2018/StateEventService/EpisodicAlertReport + + urn:uuid:095ea71f-d5cb-4a6f-a12b-19a84933efef + + urn:uuid:75793341-2aa2-4a33-adfe-c158a0ad2982 + + + + + + + + + +""", + b""" + + + https://127.0.0.1:51341/ + + http://standards.ieee.org/downloads/11073/11073-20701-2018/StateEventService/EpisodicAlertReport + + urn:uuid:095ea71f-d5cb-4a6f-a12b-19a84933efef + + urn:uuid:75793341-2aa2-4a33-adfe-c158a0ad2982 + + + + + + + + + + + + + + + + mds0 + + + + + + + + +""", + b""" + + + https://127.0.0.1:51341/ + + http://standards.ieee.org/downloads/11073/11073-20701-2018/StateEventService/EpisodicAlertReport + + urn:uuid:095ea71f-d5cb-4a6f-a12b-19a84933efef + + urn:uuid:75793341-2aa2-4a33-adfe-c158a0ad2982 + + + + + + + + + +""", + b""" + + + https://127.0.0.1:51341/ + + http://standards.ieee.org/downloads/11073/11073-20701-2018/StateEventService/EpisodicAlertReport + + urn:uuid:095ea71f-d5cb-4a6f-a12b-19a84933efef + + urn:uuid:75793341-2aa2-4a33-adfe-c158a0ad2982 + + + + + + + + + + + mds0 + + + + + + + + +""", + b""" + + + https://127.0.0.1:43165/ + + urn:uuid:urn:uuid:0561269f-45ad-4e69-9e3c-9954b7875605 + + http://standards.ieee.org/downloads/11073/11073-20701-2018/WaveformService/WaveformStream + + urn:uuid:be60a76f-ad2d-4cb4-88e2-4e983baea5ac + + + + + + + + + + + + + + + + + + + + +""", + b""" + + + https://127.0.0.1:51341/ + + http://standards.ieee.org/downloads/11073/11073-20701-2018/StateEventService/EpisodicAlertReport + + urn:uuid:095ea71f-d5cb-4a6f-a12b-19a84933efef + + urn:uuid:75793341-2aa2-4a33-adfe-c158a0ad2982 + + + + + + + + + + + + +""") + + def setUp(self) -> None: + self.xml_to_be_tested = [lxml.etree.fromstring(raw_xml)[1] for raw_xml in self.xml_to_be_parsed] + + def _compare_nodes(self, expected: xml_utils.LxmlElement, actual: xml_utils.LxmlElement): + self.assertNotEqual(id(expected), id(actual)) + self.assertEqual(expected.tag, actual.tag) + self.assertEqual(expected.text, actual.text) + self.assertEqual(expected.tail, actual.tail) + self.assertDictEqual(dict(expected.attrib), dict(actual.attrib)) # make order of attributes irrelevant + self.assertDictEqual(expected.nsmap, actual.nsmap) + self.assertEqual(len(expected), len(actual)) + for c1, c2 in zip(expected, actual): + self._compare_nodes(c1, c2) + + def test_copy_full_node(self): + for soap_body in self.xml_to_be_tested: + for report in soap_body: + new_report = xml_utils.copy_element(report) + self._compare_nodes(report.getroottree().getroot(), new_report.getroottree().getroot()) + + def test_copy_node_wo_parent(self): + for soap_body in self.xml_to_be_tested: + for report in soap_body: + new_report = xml_utils.copy_node_wo_parent(report) + self.assertEqual(new_report.getparent(), None) + self._compare_nodes(report, new_report) + + def test_lxml_element_type(self): + self.assertEqual(lxml.etree._Element, xml_utils.LxmlElement) + parsed_xml = lxml.etree.fromstring(self.xml_to_be_parsed[0]) + self.assertEqual(type(parsed_xml), xml_utils.LxmlElement) + self.assertTrue(isinstance(parsed_xml, lxml.etree._Element)) + self.assertTrue(isinstance(parsed_xml, xml_utils.LxmlElement))